YARN-1458. FairScheduler: Zero weight can lead to livelock. (Zhihai Xu via kasha)
(cherry picked from commit 3072c83b38
)
This commit is contained in:
parent
e42b889bdb
commit
a19694f19d
|
@ -279,6 +279,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2526. SLS can deadlock when all the threads are taken by AMSimulators.
|
||||
(Wei Yan via kasha)
|
||||
|
||||
YARN-1458. FairScheduler: Zero weight can lead to livelock.
|
||||
(Zhihai Xu via kasha)
|
||||
|
||||
Release 2.5.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -48,16 +48,7 @@ public class ComputeFairShares {
|
|||
public static void computeShares(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type) {
|
||||
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
|
||||
for (Schedulable sched : schedulables) {
|
||||
if ((sched instanceof FSQueue) && !((FSQueue) sched).isActive()) {
|
||||
setResourceValue(0, sched.getFairShare(), type);
|
||||
} else {
|
||||
activeSchedulables.add(sched);
|
||||
}
|
||||
}
|
||||
|
||||
computeSharesInternal(activeSchedulables, totalResources, type, false);
|
||||
computeSharesInternal(schedulables, totalResources, type, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,8 +108,13 @@ public class ComputeFairShares {
|
|||
* iterations of binary search is a constant (dependent on desired precision).
|
||||
*/
|
||||
private static void computeSharesInternal(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type, boolean isSteadyShare) {
|
||||
Collection<? extends Schedulable> allSchedulables,
|
||||
Resource totalResources, ResourceType type, boolean isSteadyShare) {
|
||||
|
||||
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
|
||||
int takenResources = handleFixedFairShares(
|
||||
allSchedulables, schedulables, isSteadyShare, type);
|
||||
|
||||
if (schedulables.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -135,9 +131,11 @@ public class ComputeFairShares {
|
|||
totalMaxShare += maxShare;
|
||||
}
|
||||
}
|
||||
int totalResource = Math.min(totalMaxShare,
|
||||
getResourceValue(totalResources, type));
|
||||
|
||||
|
||||
int totalResource = Math.max((getResourceValue(totalResources, type) -
|
||||
takenResources), 0);
|
||||
totalResource = Math.min(totalMaxShare, totalResource);
|
||||
|
||||
double rMax = 1.0;
|
||||
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
|
||||
< totalResource) {
|
||||
|
@ -196,7 +194,64 @@ public class ComputeFairShares {
|
|||
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
|
||||
return (int) share;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Helper method to handle Schedulabes with fixed fairshares.
|
||||
* Returns the resources taken by fixed fairshare schedulables,
|
||||
* and adds the remaining to the passed nonFixedSchedulables.
|
||||
*/
|
||||
private static int handleFixedFairShares(
|
||||
Collection<? extends Schedulable> schedulables,
|
||||
Collection<Schedulable> nonFixedSchedulables,
|
||||
boolean isSteadyShare, ResourceType type) {
|
||||
int totalResource = 0;
|
||||
|
||||
for (Schedulable sched : schedulables) {
|
||||
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
||||
if (fixedShare < 0) {
|
||||
nonFixedSchedulables.add(sched);
|
||||
} else {
|
||||
setResourceValue(fixedShare,
|
||||
isSteadyShare
|
||||
? ((FSQueue)sched).getSteadyFairShare()
|
||||
: sched.getFairShare(),
|
||||
type);
|
||||
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
|
||||
Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
return totalResource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
|
||||
*
|
||||
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
||||
* or the Schedulable is not active for instantaneous fairshare.
|
||||
*/
|
||||
private static int getFairShareIfFixed(Schedulable sched,
|
||||
boolean isSteadyShare, ResourceType type) {
|
||||
|
||||
// Check if maxShare is 0
|
||||
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// For instantaneous fairshares, check if queue is active
|
||||
if (!isSteadyShare &&
|
||||
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check if weight is 0
|
||||
if (sched.getWeights().getWeight(type) <= 0) {
|
||||
int minShare = getResourceValue(sched.getMinShare(), type);
|
||||
return (minShare <= 0) ? 0 : minShare;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
private static int getResourceValue(Resource resource, ResourceType type) {
|
||||
switch (type) {
|
||||
case MEMORY:
|
||||
|
|
|
@ -307,7 +307,150 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
assertEquals(3414, p.getMetrics().getSteadyFairShareMB());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFairShareWithZeroWeight() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
// set queueA and queueB weight zero.
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<weight>0.0</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<weight>0.0</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Queue A wants 2 * 1024.
|
||||
createSchedulingRequest(2 * 1024, "queueA", "user1");
|
||||
// Queue B wants 6 * 1024
|
||||
createSchedulingRequest(6 * 1024, "queueB", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueA", false);
|
||||
// queueA's weight is 0.0, so its fair share should be 0.
|
||||
assertEquals(0, queue.getFairShare().getMemory());
|
||||
// queueB's weight is 0.0, so its fair share should be 0.
|
||||
queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueB", false);
|
||||
assertEquals(0, queue.getFairShare().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
// set queueA and queueB weight zero.
|
||||
// set queueA and queueB minResources 1.
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>1 mb 1 vcores</minResources>");
|
||||
out.println("<weight>0.0</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>1 mb 1 vcores</minResources>");
|
||||
out.println("<weight>0.0</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Queue A wants 2 * 1024.
|
||||
createSchedulingRequest(2 * 1024, "queueA", "user1");
|
||||
// Queue B wants 6 * 1024
|
||||
createSchedulingRequest(6 * 1024, "queueB", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueA", false);
|
||||
// queueA's weight is 0.0 and minResources is 1,
|
||||
// so its fair share should be 1 (minShare).
|
||||
assertEquals(1, queue.getFairShare().getMemory());
|
||||
// queueB's weight is 0.0 and minResources is 1,
|
||||
// so its fair share should be 1 (minShare).
|
||||
queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueB", false);
|
||||
assertEquals(1, queue.getFairShare().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareWithNoneZeroWeightNoneZeroMinRes()
|
||||
throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
// set queueA and queueB weight 0.5.
|
||||
// set queueA and queueB minResources 1024.
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>1024 mb 1 vcores</minResources>");
|
||||
out.println("<weight>0.5</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>1024 mb 1 vcores</minResources>");
|
||||
out.println("<weight>0.5</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Queue A wants 4 * 1024.
|
||||
createSchedulingRequest(4 * 1024, "queueA", "user1");
|
||||
// Queue B wants 4 * 1024
|
||||
createSchedulingRequest(4 * 1024, "queueB", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueA", false);
|
||||
// queueA's weight is 0.5 and minResources is 1024,
|
||||
// so its fair share should be 4096.
|
||||
assertEquals(4096, queue.getFairShare().getMemory());
|
||||
// queueB's weight is 0.5 and minResources is 1024,
|
||||
// so its fair share should be 4096.
|
||||
queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueB", false);
|
||||
assertEquals(4096, queue.getFairShare().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
|
||||
scheduler.init(conf);
|
||||
|
|
Loading…
Reference in New Issue