YARN-1458. FairScheduler: Zero weight can lead to livelock. (Zhihai Xu via kasha)

This commit is contained in:
Karthik Kambatla 2014-09-10 08:26:14 -07:00
parent b100949404
commit 3072c83b38
3 changed files with 218 additions and 17 deletions

View File

@ -305,6 +305,9 @@ Release 2.6.0 - UNRELEASED
YARN-2526. SLS can deadlock when all the threads are taken by AMSimulators. YARN-2526. SLS can deadlock when all the threads are taken by AMSimulators.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-1458. FairScheduler: Zero weight can lead to livelock.
(Zhihai Xu via kasha)
Release 2.5.1 - UNRELEASED Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -48,16 +48,7 @@ public class ComputeFairShares {
public static void computeShares( public static void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources, Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) { ResourceType type) {
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>(); computeSharesInternal(schedulables, totalResources, type, false);
for (Schedulable sched : schedulables) {
if ((sched instanceof FSQueue) && !((FSQueue) sched).isActive()) {
setResourceValue(0, sched.getFairShare(), type);
} else {
activeSchedulables.add(sched);
}
}
computeSharesInternal(activeSchedulables, totalResources, type, false);
} }
/** /**
@ -117,8 +108,13 @@ public class ComputeFairShares {
* iterations of binary search is a constant (dependent on desired precision). * iterations of binary search is a constant (dependent on desired precision).
*/ */
private static void computeSharesInternal( private static void computeSharesInternal(
Collection<? extends Schedulable> schedulables, Resource totalResources, Collection<? extends Schedulable> allSchedulables,
ResourceType type, boolean isSteadyShare) { Resource totalResources, ResourceType type, boolean isSteadyShare) {
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) { if (schedulables.isEmpty()) {
return; return;
} }
@ -135,8 +131,10 @@ public class ComputeFairShares {
totalMaxShare += maxShare; totalMaxShare += maxShare;
} }
} }
int totalResource = Math.min(totalMaxShare,
getResourceValue(totalResources, type)); int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
double rMax = 1.0; double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
@ -197,6 +195,63 @@ public class ComputeFairShares {
return (int) share; return (int) share;
} }
/**
* Helper method to handle Schedulabes with fixed fairshares.
* Returns the resources taken by fixed fairshare schedulables,
* and adds the remaining to the passed nonFixedSchedulables.
*/
private static int handleFixedFairShares(
Collection<? extends Schedulable> schedulables,
Collection<Schedulable> nonFixedSchedulables,
boolean isSteadyShare, ResourceType type) {
int totalResource = 0;
for (Schedulable sched : schedulables) {
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
setResourceValue(fixedShare,
isSteadyShare
? ((FSQueue)sched).getSteadyFairShare()
: sched.getFairShare(),
type);
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
Integer.MAX_VALUE);
}
}
return totalResource;
}
/**
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
*
* The fairshare is fixed if either the maxShare is 0, weight is 0,
* or the Schedulable is not active for instantaneous fairshare.
*/
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// Check if maxShare is 0
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
return 0;
}
// For instantaneous fairshares, check if queue is active
if (!isSteadyShare &&
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
return 0;
}
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
return -1;
}
private static int getResourceValue(Resource resource, ResourceType type) { private static int getResourceValue(Resource resource, ResourceType type) {
switch (type) { switch (type) {
case MEMORY: case MEMORY:

View File

@ -308,6 +308,149 @@ public class TestFairScheduler extends FairSchedulerTestBase {
} }
} }
@Test
public void testFairShareWithZeroWeight() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
// set queueA and queueB weight zero.
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>0.0</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>0.0</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 2 * 1024.
createSchedulingRequest(2 * 1024, "queueA", "user1");
// Queue B wants 6 * 1024
createSchedulingRequest(6 * 1024, "queueB", "user1");
scheduler.update();
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// queueA's weight is 0.0, so its fair share should be 0.
assertEquals(0, queue.getFairShare().getMemory());
// queueB's weight is 0.0, so its fair share should be 0.
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
assertEquals(0, queue.getFairShare().getMemory());
}
@Test
public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
// set queueA and queueB weight zero.
// set queueA and queueB minResources 1.
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>1 mb 1 vcores</minResources>");
out.println("<weight>0.0</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>1 mb 1 vcores</minResources>");
out.println("<weight>0.0</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 2 * 1024.
createSchedulingRequest(2 * 1024, "queueA", "user1");
// Queue B wants 6 * 1024
createSchedulingRequest(6 * 1024, "queueB", "user1");
scheduler.update();
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// queueA's weight is 0.0 and minResources is 1,
// so its fair share should be 1 (minShare).
assertEquals(1, queue.getFairShare().getMemory());
// queueB's weight is 0.0 and minResources is 1,
// so its fair share should be 1 (minShare).
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
assertEquals(1, queue.getFairShare().getMemory());
}
@Test
public void testFairShareWithNoneZeroWeightNoneZeroMinRes()
throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
// set queueA and queueB weight 0.5.
// set queueA and queueB minResources 1024.
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024 mb 1 vcores</minResources>");
out.println("<weight>0.5</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>1024 mb 1 vcores</minResources>");
out.println("<weight>0.5</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 4 * 1024.
createSchedulingRequest(4 * 1024, "queueA", "user1");
// Queue B wants 4 * 1024
createSchedulingRequest(4 * 1024, "queueB", "user1");
scheduler.update();
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// queueA's weight is 0.5 and minResources is 1024,
// so its fair share should be 4096.
assertEquals(4096, queue.getFairShare().getMemory());
// queueB's weight is 0.5 and minResources is 1024,
// so its fair share should be 4096.
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
assertEquals(4096, queue.getFairShare().getMemory());
}
@Test @Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException { public void testSimpleHierarchicalFairShareCalculation() throws IOException {
scheduler.init(conf); scheduler.init(conf);