YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)
(cherry picked from commit 1df39c1efc
)
This commit is contained in:
parent
37a93c2d78
commit
c36090fd3f
|
@ -565,6 +565,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
|
YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
|
||||||
(Bibin A Chundatt via xgong)
|
(Bibin A Chundatt via xgong)
|
||||||
|
|
||||||
|
YARN-3849. Too much of preemption activity causing continuos killing of
|
||||||
|
containers across queues. (Sunil G via wangda)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -840,12 +840,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
||||||
synchronized (curQueue) {
|
synchronized (curQueue) {
|
||||||
String queueName = curQueue.getQueueName();
|
String queueName = curQueue.getQueueName();
|
||||||
QueueCapacities qc = curQueue.getQueueCapacities();
|
QueueCapacities qc = curQueue.getQueueCapacities();
|
||||||
float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
|
|
||||||
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
|
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
|
||||||
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
|
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
|
||||||
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
|
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
|
||||||
|
|
||||||
Resource current = Resources.multiply(partitionResource, absUsed);
|
Resource current = curQueue.getQueueResourceUsage().getUsed(
|
||||||
|
partitionToLookAt);
|
||||||
Resource guaranteed = Resources.multiply(partitionResource, absCap);
|
Resource guaranteed = Resources.multiply(partitionResource, absCap);
|
||||||
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
|
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -372,7 +374,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
appA.getApplicationId(), appA.getAttemptId());
|
appA.getApplicationId(), appA.getAttemptId());
|
||||||
assertTrue("appA should be running on queueB",
|
assertTrue("appA should be running on queueB",
|
||||||
mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
|
mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
|
||||||
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
|
|
||||||
// Need to call setup() again to reset mDisp
|
// Need to call setup() again to reset mDisp
|
||||||
setup();
|
setup();
|
||||||
|
@ -395,7 +397,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// Resources should have come from queueE (appC) and neither of queueA's
|
// Resources should have come from queueE (appC) and neither of queueA's
|
||||||
// children.
|
// children.
|
||||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -470,8 +472,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// With all queues preemptable, resources should be taken from queueC(appA)
|
// With all queues preemptable, resources should be taken from queueC(appA)
|
||||||
// and queueD(appB). Resources taken more from queueD(appB) than
|
// and queueD(appB). Resources taken more from queueD(appB) than
|
||||||
// queueC(appA) because it's over its capacity by a larger percentage.
|
// queueC(appA) because it's over its capacity by a larger percentage.
|
||||||
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
||||||
|
|
||||||
// Turn off preemption for queueA and it's children. queueF(appC)'s request
|
// Turn off preemption for queueA and it's children. queueF(appC)'s request
|
||||||
// should starve.
|
// should starve.
|
||||||
|
@ -635,7 +637,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
policy.editSchedule();
|
policy.editSchedule();
|
||||||
// verify capacity taken from A1, not B1 despite B1 being far over
|
// verify capacity taken from A1, not B1 despite B1 being far over
|
||||||
// its absolute guaranteed capacity
|
// its absolute guaranteed capacity
|
||||||
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -676,7 +678,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
// we verify both that C has priority on B and D (has it has >0 guarantees)
|
// we verify both that C has priority on B and D (has it has >0 guarantees)
|
||||||
// and that B and D are force to share their over capacity fairly (as they
|
// and that B and D are force to share their over capacity fairly (as they
|
||||||
// are both zero-guarantees) hence D sees some of its containers preempted
|
// are both zero-guarantees) hence D sees some of its containers preempted
|
||||||
verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -703,8 +705,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
|
|
||||||
// XXX note: compensating for rounding error in Resources.multiplyTo
|
// XXX note: compensating for rounding error in Resources.multiplyTo
|
||||||
// which is likely triggered since we use small numbers for readability
|
// which is likely triggered since we use small numbers for readability
|
||||||
verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -868,6 +870,34 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
setAMContainer = false;
|
setAMContainer = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreemptionWithVCoreResource() {
|
||||||
|
int[][] qData = new int[][]{
|
||||||
|
// / A B
|
||||||
|
{100, 100, 100}, // maxcap
|
||||||
|
{5, 1, 1}, // apps
|
||||||
|
{2, 0, 0}, // subqueues
|
||||||
|
};
|
||||||
|
|
||||||
|
// Resources can be set like memory:vcores
|
||||||
|
String[][] resData = new String[][]{
|
||||||
|
// / A B
|
||||||
|
{"100:100", "50:50", "50:50"}, // abs
|
||||||
|
{"10:100", "10:100", "0"}, // used
|
||||||
|
{"70:20", "70:20", "10:100"}, // pending
|
||||||
|
{"0", "0", "0"}, // reserved
|
||||||
|
{"-1", "1:10", "1:10"}, // req granularity
|
||||||
|
};
|
||||||
|
|
||||||
|
// Passing last param as TRUE to use DominantResourceCalculator
|
||||||
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
|
||||||
|
true);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
// 5 containers will be preempted here
|
||||||
|
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
|
}
|
||||||
|
|
||||||
static class IsPreemptionRequestFor
|
static class IsPreemptionRequestFor
|
||||||
extends ArgumentMatcher<ContainerPreemptEvent> {
|
extends ArgumentMatcher<ContainerPreemptEvent> {
|
||||||
private final ApplicationAttemptId appAttId;
|
private final ApplicationAttemptId appAttId;
|
||||||
|
@ -892,13 +922,40 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
|
||||||
ProportionalCapacityPreemptionPolicy policy =
|
ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
|
||||||
new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
|
conf, rmContext, mCS, mClock);
|
||||||
|
clusterResources = Resource.newInstance(
|
||||||
|
leafAbsCapacities(qData[0], qData[7]), 0);
|
||||||
ParentQueue mRoot = buildMockRootQueue(rand, qData);
|
ParentQueue mRoot = buildMockRootQueue(rand, qData);
|
||||||
when(mCS.getRootQueue()).thenReturn(mRoot);
|
when(mCS.getRootQueue()).thenReturn(mRoot);
|
||||||
|
|
||||||
clusterResources =
|
setResourceAndNodeDetails();
|
||||||
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
|
return policy;
|
||||||
|
}
|
||||||
|
|
||||||
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
||||||
|
String[][] resData) {
|
||||||
|
return buildPolicy(qData, resData, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
||||||
|
String[][] resData, boolean useDominantResourceCalculator) {
|
||||||
|
if (useDominantResourceCalculator) {
|
||||||
|
when(mCS.getResourceCalculator()).thenReturn(
|
||||||
|
new DominantResourceCalculator());
|
||||||
|
}
|
||||||
|
ProportionalCapacityPreemptionPolicy policy =
|
||||||
|
new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
|
||||||
|
clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
|
||||||
|
qData[2]);
|
||||||
|
ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
|
||||||
|
when(mCS.getRootQueue()).thenReturn(mRoot);
|
||||||
|
|
||||||
|
setResourceAndNodeDetails();
|
||||||
|
return policy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setResourceAndNodeDetails() {
|
||||||
when(mCS.getClusterResource()).thenReturn(clusterResources);
|
when(mCS.getClusterResource()).thenReturn(clusterResources);
|
||||||
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
|
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
|
||||||
clusterResources);
|
clusterResources);
|
||||||
|
@ -906,35 +963,78 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
SchedulerNode mNode = mock(SchedulerNode.class);
|
SchedulerNode mNode = mock(SchedulerNode.class);
|
||||||
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
|
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
|
||||||
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
|
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
|
||||||
return policy;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
|
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
|
||||||
int[] abs = queueData[0];
|
Resource[] abs = generateResourceList(queueData[0]);
|
||||||
int[] maxCap = queueData[1];
|
Resource[] used = generateResourceList(queueData[2]);
|
||||||
int[] used = queueData[2];
|
Resource[] pending = generateResourceList(queueData[3]);
|
||||||
int[] pending = queueData[3];
|
Resource[] reserved = generateResourceList(queueData[4]);
|
||||||
int[] reserved = queueData[4];
|
Resource[] gran = generateResourceList(queueData[6]);
|
||||||
int[] apps = queueData[5];
|
int[] maxCap = queueData[1];
|
||||||
int[] gran = queueData[6];
|
int[] apps = queueData[5];
|
||||||
int[] queues = queueData[7];
|
int[] queues = queueData[7];
|
||||||
|
|
||||||
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
||||||
}
|
}
|
||||||
|
|
||||||
ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
|
ParentQueue buildMockRootQueue(Random r, String[][] resData,
|
||||||
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
|
int[]... queueData) {
|
||||||
float tot = leafAbsCapacities(abs, queues);
|
Resource[] abs = parseResourceDetails(resData[0]);
|
||||||
|
Resource[] used = parseResourceDetails(resData[1]);
|
||||||
|
Resource[] pending = parseResourceDetails(resData[2]);
|
||||||
|
Resource[] reserved = parseResourceDetails(resData[3]);
|
||||||
|
Resource[] gran = parseResourceDetails(resData[4]);
|
||||||
|
int[] maxCap = queueData[0];
|
||||||
|
int[] apps = queueData[1];
|
||||||
|
int[] queues = queueData[2];
|
||||||
|
|
||||||
|
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
||||||
|
}
|
||||||
|
|
||||||
|
Resource[] parseResourceDetails(String[] resData) {
|
||||||
|
List<Resource> resourceList = new ArrayList<Resource>();
|
||||||
|
for (int i = 0; i < resData.length; i++) {
|
||||||
|
String[] resource = resData[i].split(":");
|
||||||
|
if (resource.length == 1) {
|
||||||
|
resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
|
||||||
|
} else {
|
||||||
|
resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
|
||||||
|
Integer.valueOf(resource[1])));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resourceList.toArray(new Resource[resourceList.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Resource[] generateResourceList(int[] qData) {
|
||||||
|
List<Resource> resourceList = new ArrayList<Resource>();
|
||||||
|
for (int i = 0; i < qData.length; i++) {
|
||||||
|
resourceList.add(Resource.newInstance(qData[i], 0));
|
||||||
|
}
|
||||||
|
return resourceList.toArray(new Resource[resourceList.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
|
||||||
|
Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
|
||||||
|
int[] queues) {
|
||||||
|
ResourceCalculator rc = mCS.getResourceCalculator();
|
||||||
|
Resource tot = leafAbsCapacities(abs, queues);
|
||||||
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
|
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
|
||||||
ParentQueue root = mockParentQueue(null, queues[0], pqs);
|
ParentQueue root = mockParentQueue(null, queues[0], pqs);
|
||||||
|
ResourceUsage resUsage = new ResourceUsage();
|
||||||
|
resUsage.setUsed(used[0]);
|
||||||
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
||||||
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
|
when(root.getAbsoluteUsedCapacity()).thenReturn(
|
||||||
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
Resources.divide(rc, tot, used[0], tot));
|
||||||
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
when(root.getAbsoluteCapacity()).thenReturn(
|
||||||
|
Resources.divide(rc, tot, abs[0], tot));
|
||||||
|
when(root.getAbsoluteMaximumCapacity()).thenReturn(
|
||||||
|
maxCap[0] / (float) tot.getMemory());
|
||||||
|
when(root.getQueueResourceUsage()).thenReturn(resUsage);
|
||||||
QueueCapacities rootQc = new QueueCapacities(true);
|
QueueCapacities rootQc = new QueueCapacities(true);
|
||||||
rootQc.setAbsoluteUsedCapacity(used[0] / tot);
|
rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot));
|
||||||
rootQc.setAbsoluteCapacity(abs[0] / tot);
|
rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot));
|
||||||
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
|
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemory());
|
||||||
when(root.getQueueCapacities()).thenReturn(rootQc);
|
when(root.getQueueCapacities()).thenReturn(rootQc);
|
||||||
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
||||||
boolean preemptionDisabled = mockPreemptionStatus("root");
|
boolean preemptionDisabled = mockPreemptionStatus("root");
|
||||||
|
@ -943,28 +1043,35 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
for (int i = 1; i < queues.length; ++i) {
|
for (int i = 1; i < queues.length; ++i) {
|
||||||
final CSQueue q;
|
final CSQueue q;
|
||||||
final ParentQueue p = pqs.removeLast();
|
final ParentQueue p = pqs.removeLast();
|
||||||
final String queueName = "queue" + ((char)('A' + i - 1));
|
final String queueName = "queue" + ((char) ('A' + i - 1));
|
||||||
if (queues[i] > 0) {
|
if (queues[i] > 0) {
|
||||||
q = mockParentQueue(p, queues[i], pqs);
|
q = mockParentQueue(p, queues[i], pqs);
|
||||||
|
ResourceUsage resUsagePerQueue = new ResourceUsage();
|
||||||
|
resUsagePerQueue.setUsed(used[i]);
|
||||||
|
when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
|
||||||
} else {
|
} else {
|
||||||
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
|
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
|
||||||
}
|
}
|
||||||
when(q.getParent()).thenReturn(p);
|
when(q.getParent()).thenReturn(p);
|
||||||
when(q.getQueueName()).thenReturn(queueName);
|
when(q.getQueueName()).thenReturn(queueName);
|
||||||
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
|
when(q.getAbsoluteUsedCapacity()).thenReturn(
|
||||||
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
|
Resources.divide(rc, tot, used[i], tot));
|
||||||
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
|
when(q.getAbsoluteCapacity()).thenReturn(
|
||||||
|
Resources.divide(rc, tot, abs[i], tot));
|
||||||
|
when(q.getAbsoluteMaximumCapacity()).thenReturn(
|
||||||
|
maxCap[i] / (float) tot.getMemory());
|
||||||
|
|
||||||
// We need to make these fields to QueueCapacities
|
// We need to make these fields to QueueCapacities
|
||||||
QueueCapacities qc = new QueueCapacities(false);
|
QueueCapacities qc = new QueueCapacities(false);
|
||||||
qc.setAbsoluteUsedCapacity(used[i] / tot);
|
qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot));
|
||||||
qc.setAbsoluteCapacity(abs[i] / tot);
|
qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot));
|
||||||
qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
|
qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemory());
|
||||||
when(q.getQueueCapacities()).thenReturn(qc);
|
when(q.getQueueCapacities()).thenReturn(qc);
|
||||||
|
|
||||||
String parentPathName = p.getQueuePath();
|
String parentPathName = p.getQueuePath();
|
||||||
parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
||||||
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
|
String queuePathName = (parentPathName + "." + queueName).replace("/",
|
||||||
|
"root");
|
||||||
when(q.getQueuePath()).thenReturn(queuePathName);
|
when(q.getQueuePath()).thenReturn(queuePathName);
|
||||||
preemptionDisabled = mockPreemptionStatus(queuePathName);
|
preemptionDisabled = mockPreemptionStatus(queuePathName);
|
||||||
when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
||||||
|
@ -1004,16 +1111,18 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
|
LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
|
||||||
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
|
Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
|
||||||
|
Resource[] gran) {
|
||||||
LeafQueue lq = mock(LeafQueue.class);
|
LeafQueue lq = mock(LeafQueue.class);
|
||||||
|
ResourceCalculator rc = mCS.getResourceCalculator();
|
||||||
List<ApplicationAttemptId> appAttemptIdList =
|
List<ApplicationAttemptId> appAttemptIdList =
|
||||||
new ArrayList<ApplicationAttemptId>();
|
new ArrayList<ApplicationAttemptId>();
|
||||||
when(lq.getTotalResourcePending()).thenReturn(
|
when(lq.getTotalResourcePending()).thenReturn(pending[i]);
|
||||||
Resource.newInstance(pending[i], 0));
|
|
||||||
// need to set pending resource in resource usage as well
|
// need to set pending resource in resource usage as well
|
||||||
ResourceUsage ru = new ResourceUsage();
|
ResourceUsage ru = new ResourceUsage();
|
||||||
ru.setPending(Resource.newInstance(pending[i], 0));
|
ru.setPending(pending[i]);
|
||||||
|
ru.setUsed(used[i]);
|
||||||
when(lq.getQueueResourceUsage()).thenReturn(ru);
|
when(lq.getQueueResourceUsage()).thenReturn(ru);
|
||||||
// consider moving where CapacityScheduler::comparator accessible
|
// consider moving where CapacityScheduler::comparator accessible
|
||||||
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
||||||
|
@ -1026,9 +1135,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
});
|
});
|
||||||
// applications are added in global L->R order in queues
|
// applications are added in global L->R order in queues
|
||||||
if (apps[i] != 0) {
|
if (apps[i] != 0) {
|
||||||
int aUsed = used[i] / apps[i];
|
Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
|
||||||
int aPending = pending[i] / apps[i];
|
Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
|
||||||
int aReserve = reserved[i] / apps[i];
|
Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
|
||||||
for (int a = 0; a < apps[i]; ++a) {
|
for (int a = 0; a < apps[i]; ++a) {
|
||||||
FiCaSchedulerApp mockFiCaApp =
|
FiCaSchedulerApp mockFiCaApp =
|
||||||
mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
|
mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
|
||||||
|
@ -1055,9 +1164,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
return lq;
|
return lq;
|
||||||
}
|
}
|
||||||
|
|
||||||
FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
|
FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
|
||||||
int gran) {
|
Resource reserved, Resource gran) {
|
||||||
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
|
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
|
||||||
|
ResourceCalculator rc = mCS.getResourceCalculator();
|
||||||
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(TS, id);
|
ApplicationId appId = ApplicationId.newInstance(TS, id);
|
||||||
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
|
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
|
||||||
|
@ -1065,30 +1175,35 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
when(app.getApplicationAttemptId()).thenReturn(appAttId);
|
when(app.getApplicationAttemptId()).thenReturn(appAttId);
|
||||||
|
|
||||||
int cAlloc = 0;
|
int cAlloc = 0;
|
||||||
Resource unit = Resource.newInstance(gran, 0);
|
Resource unit = gran;
|
||||||
List<RMContainer> cReserved = new ArrayList<RMContainer>();
|
List<RMContainer> cReserved = new ArrayList<RMContainer>();
|
||||||
for (int i = 0; i < reserved; i += gran) {
|
Resource resIter = Resource.newInstance(0, 0);
|
||||||
cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
|
for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources
|
||||||
.getValue()));
|
.addTo(resIter, gran)) {
|
||||||
|
cReserved.add(mockContainer(appAttId, cAlloc, unit,
|
||||||
|
priority.CONTAINER.getValue()));
|
||||||
++cAlloc;
|
++cAlloc;
|
||||||
}
|
}
|
||||||
when(app.getReservedContainers()).thenReturn(cReserved);
|
when(app.getReservedContainers()).thenReturn(cReserved);
|
||||||
|
|
||||||
List<RMContainer> cLive = new ArrayList<RMContainer>();
|
List<RMContainer> cLive = new ArrayList<RMContainer>();
|
||||||
for (int i = 0; i < used; i += gran) {
|
Resource usedIter = Resource.newInstance(0, 0);
|
||||||
if(setAMContainer && i == 0){
|
int i = 0;
|
||||||
cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
|
for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources
|
||||||
.getValue()));
|
.addTo(usedIter, gran)) {
|
||||||
}else if(setLabeledContainer && i ==1){
|
if (setAMContainer && i == 0) {
|
||||||
|
cLive.add(mockContainer(appAttId, cAlloc, unit,
|
||||||
|
priority.AMCONTAINER.getValue()));
|
||||||
|
} else if (setLabeledContainer && i == 1) {
|
||||||
cLive.add(mockContainer(appAttId, cAlloc, unit,
|
cLive.add(mockContainer(appAttId, cAlloc, unit,
|
||||||
priority.LABELEDCONTAINER.getValue()));
|
priority.LABELEDCONTAINER.getValue()));
|
||||||
++used;
|
Resources.addTo(used, Resource.newInstance(1, 1));
|
||||||
}
|
} else {
|
||||||
else{
|
cLive.add(mockContainer(appAttId, cAlloc, unit,
|
||||||
cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
|
priority.CONTAINER.getValue()));
|
||||||
.getValue()));
|
|
||||||
}
|
}
|
||||||
++cAlloc;
|
++cAlloc;
|
||||||
|
++i;
|
||||||
}
|
}
|
||||||
when(app.getLiveContainers()).thenReturn(cLive);
|
when(app.getLiveContainers()).thenReturn(cLive);
|
||||||
return app;
|
return app;
|
||||||
|
@ -1124,6 +1239,16 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
|
||||||
|
Resource ret = Resource.newInstance(0, 0);
|
||||||
|
for (int i = 0; i < abs.length; ++i) {
|
||||||
|
if (0 == subqueues[i]) {
|
||||||
|
Resources.addTo(ret, abs[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
void printString(CSQueue nq, String indent) {
|
void printString(CSQueue nq, String indent) {
|
||||||
if (nq instanceof ParentQueue) {
|
if (nq instanceof ParentQueue) {
|
||||||
System.out.println(indent + nq.getQueueName()
|
System.out.println(indent + nq.getQueueName()
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -771,6 +772,60 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodePartitionPreemptionWithVCoreResource() throws IOException {
|
||||||
|
/**
|
||||||
|
* Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / \
|
||||||
|
* a b
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* Both a/b can access x, and guaranteed capacity of them is 50:50. Two
|
||||||
|
* nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
|
||||||
|
* app1/app2 in a, and app3/app4 in b. app1 uses 80 x, app2 uses 20
|
||||||
|
* NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL. Both a/b have 50 pending
|
||||||
|
* resource for x and NO_LABEL
|
||||||
|
*
|
||||||
|
* After preemption, it should preempt 30 from app1, and 30 from app4.
|
||||||
|
*/
|
||||||
|
String labelsConfig = "=100:200,true;" + // default partition
|
||||||
|
"x=100:200,true"; // partition=x
|
||||||
|
String nodesConfig = "n1=x;" + // n1 has partition=x
|
||||||
|
"n2="; // n2 is default partition
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending
|
||||||
|
"root(=[100:200 100:200 100:200 100:200],x=[100:200 100:200 100:200 100:200]);"
|
||||||
|
+ // root
|
||||||
|
"-a(=[50:100 100:200 20:40 50:100],x=[50:100 100:200 80:160 50:100]);" + // a
|
||||||
|
"-b(=[50:100 100:200 80:160 50:100],x=[50:100 100:200 20:40 50:100])"; // b
|
||||||
|
String appsConfig =
|
||||||
|
// queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||||
|
"a\t" // app1 in a
|
||||||
|
+ "(1,1:2,n1,x,80,false);" + // 80 * x in n1
|
||||||
|
"a\t" // app2 in a
|
||||||
|
+ "(1,1:2,n2,,20,false);" + // 20 default in n2
|
||||||
|
"b\t" // app3 in b
|
||||||
|
+ "(1,1:2,n1,x,20,false);" + // 20 * x in n1
|
||||||
|
"b\t" // app4 in b
|
||||||
|
+ "(1,1:2,n2,,80,false)"; // 80 default in n2
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
// 30 preempted from app1, 30 preempted from app4, and nothing preempted
|
||||||
|
// from app2/app3
|
||||||
|
verify(mDisp, times(30)).handle(
|
||||||
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
|
||||||
|
verify(mDisp, times(30)).handle(
|
||||||
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
|
||||||
|
verify(mDisp, never()).handle(
|
||||||
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
|
||||||
|
verify(mDisp, never()).handle(
|
||||||
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
|
||||||
|
}
|
||||||
|
|
||||||
private ApplicationAttemptId getAppAttemptId(int id) {
|
private ApplicationAttemptId getAppAttemptId(int id) {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0L, id);
|
ApplicationId appId = ApplicationId.newInstance(0L, id);
|
||||||
|
@ -821,6 +876,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
|
|
||||||
private void buildEnv(String labelsConfig, String nodesConfig,
|
private void buildEnv(String labelsConfig, String nodesConfig,
|
||||||
String queuesConfig, String appsConfig) throws IOException {
|
String queuesConfig, String appsConfig) throws IOException {
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void buildEnv(String labelsConfig, String nodesConfig,
|
||||||
|
String queuesConfig, String appsConfig,
|
||||||
|
boolean useDominantResourceCalculator) throws IOException {
|
||||||
|
if (useDominantResourceCalculator) {
|
||||||
|
when(cs.getResourceCalculator()).thenReturn(
|
||||||
|
new DominantResourceCalculator());
|
||||||
|
}
|
||||||
mockNodeLabelsManager(labelsConfig);
|
mockNodeLabelsManager(labelsConfig);
|
||||||
mockSchedulerNodes(nodesConfig);
|
mockSchedulerNodes(nodesConfig);
|
||||||
for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
|
for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
|
||||||
|
@ -832,7 +897,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
when(cs.getClusterResource()).thenReturn(clusterResource);
|
when(cs.getClusterResource()).thenReturn(clusterResource);
|
||||||
mockApplications(appsConfig);
|
mockApplications(appsConfig);
|
||||||
|
|
||||||
policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
|
policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
|
||||||
|
mClock);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
|
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
|
||||||
|
@ -868,7 +934,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
+ "(priority,resource,host,expression,repeat,reserved)");
|
+ "(priority,resource,host,expression,repeat,reserved)");
|
||||||
}
|
}
|
||||||
Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
|
Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
|
||||||
Resource res = Resources.createResource(Integer.valueOf(values[1]));
|
Resource res = parseResourceFromString(values[1]);
|
||||||
NodeId host = NodeId.newInstance(values[2], 1);
|
NodeId host = NodeId.newInstance(values[2], 1);
|
||||||
String exp = values[3];
|
String exp = values[3];
|
||||||
int repeat = Integer.valueOf(values[4]);
|
int repeat = Integer.valueOf(values[4]);
|
||||||
|
@ -1002,11 +1068,10 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
clusterResource = Resources.createResource(0);
|
clusterResource = Resources.createResource(0);
|
||||||
for (String p : partitionConfigArr) {
|
for (String p : partitionConfigArr) {
|
||||||
String partitionName = p.substring(0, p.indexOf("="));
|
String partitionName = p.substring(0, p.indexOf("="));
|
||||||
int totalResource =
|
Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
|
||||||
Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(",")));
|
p.indexOf(",")));
|
||||||
boolean exclusivity =
|
boolean exclusivity =
|
||||||
Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
|
Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
|
||||||
Resource res = Resources.createResource(totalResource);
|
|
||||||
when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
|
when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
|
||||||
.thenReturn(res);
|
.thenReturn(res);
|
||||||
when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
|
when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
|
||||||
|
@ -1022,6 +1087,18 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
partitionToResource.keySet());
|
partitionToResource.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Resource parseResourceFromString(String p) {
|
||||||
|
String[] resource = p.split(":");
|
||||||
|
Resource res = Resources.createResource(0);
|
||||||
|
if (resource.length == 1) {
|
||||||
|
res = Resources.createResource(Integer.valueOf(resource[0]));
|
||||||
|
} else {
|
||||||
|
res = Resources.createResource(Integer.valueOf(resource[0]),
|
||||||
|
Integer.valueOf(resource[1]));
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Format is:
|
* Format is:
|
||||||
* <pre>
|
* <pre>
|
||||||
|
@ -1120,23 +1197,22 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
|
||||||
// Add a small epsilon to capacities to avoid truncate when doing
|
// Add a small epsilon to capacities to avoid truncate when doing
|
||||||
// Resources.multiply
|
// Resources.multiply
|
||||||
float epsilon = 1e-6f;
|
float epsilon = 1e-6f;
|
||||||
float absGuaranteed =
|
Resource totResoucePerPartition = partitionToResource.get(partitionName);
|
||||||
Integer.valueOf(values[0].trim())
|
float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
|
||||||
/ (float) (partitionToResource.get(partitionName).getMemory())
|
parseResourceFromString(values[0].trim()), totResoucePerPartition)
|
||||||
+ epsilon;
|
+ epsilon;
|
||||||
float absMax =
|
float absMax = Resources.divide(rc, totResoucePerPartition,
|
||||||
Integer.valueOf(values[1].trim())
|
parseResourceFromString(values[1].trim()), totResoucePerPartition)
|
||||||
/ (float) (partitionToResource.get(partitionName).getMemory())
|
+ epsilon;
|
||||||
+ epsilon;
|
float absUsed = Resources.divide(rc, totResoucePerPartition,
|
||||||
float absUsed =
|
parseResourceFromString(values[2].trim()), totResoucePerPartition)
|
||||||
Integer.valueOf(values[2].trim())
|
+ epsilon;
|
||||||
/ (float) (partitionToResource.get(partitionName).getMemory())
|
Resource pending = parseResourceFromString(values[3].trim());
|
||||||
+ epsilon;
|
|
||||||
Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
|
|
||||||
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
|
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
|
||||||
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
|
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
|
||||||
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
|
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
|
||||||
ru.setPending(partitionName, pending);
|
ru.setPending(partitionName, pending);
|
||||||
|
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
|
||||||
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
|
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
|
||||||
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
|
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
|
||||||
+ ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
|
+ ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
|
||||||
|
|
Loading…
Reference in New Issue