YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.

(cherry picked from commit 0a1637c750)
This commit is contained in:
Weiwei Yang 2019-02-11 15:53:50 +08:00
parent 3d552b24ba
commit fbd03543d8
2 changed files with 116 additions and 165 deletions

View File

@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.Arrays;
@ -68,28 +70,43 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinali
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
/**
* Test Container Allocation with SchedulingRequest.
*/
@RunWith(Parameterized.class)
public class TestSchedulingRequestContainerAllocation {
private static final int GB = 1024;
private YarnConfiguration conf;
private String placementConstraintHandler;
RMNodeLabelsManager mgr;
@Parameters
public static Object[] placementConstarintHandlers() {
return new Object[] {
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
}
public TestSchedulingRequestContainerAllocation(
String placementConstraintHandler) {
this.placementConstraintHandler = placementConstraintHandler;
}
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
this.placementConstraintHandler);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@Test
@Test(timeout = 30000L)
public void testIntraAppAntiAffinity() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -120,18 +137,9 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get 5 containers allocated (1 AM + 1 node each).
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
List<Container> allocated = waitForAllocation(4, 3000, am1, nms);
Assert.assertEquals(4, allocated.size());
Assert.assertEquals(4, getContainerNodesNum(allocated));
// Similarly, app1 asks 10 anti-affinity containers at different priority,
// it should be satisfied as well.
@ -141,38 +149,30 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get 9 containers allocated (1 AM + 8 containers).
Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
allocated = waitForAllocation(4, 3000, am1, nms);
Assert.assertEquals(4, allocated.size());
Assert.assertEquals(4, getContainerNodesNum(allocated));
// Test anti-affinity to both of "mapper/reducer", we should only get no
// container allocated
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get 10 containers allocated (1 AM + 9 containers).
Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
boolean caughtException = false;
try {
allocated = waitForAllocation(1, 3000, am1, nms);
} catch (Exception e) {
caughtException = true;
}
Assert.assertTrue(caughtException);
rm1.close();
}
@Test
@Test(timeout = 30000L)
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -203,18 +203,9 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
"tag_1_1", "tag_1_2");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get 3 containers allocated (1 AM + 2 task).
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
List<Container> allocated = waitForAllocation(2, 3000, am1, nms);
Assert.assertEquals(2, allocated.size());
Assert.assertEquals(2, getContainerNodesNum(allocated));
// app1 asks for 1 anti-affinity containers for the same app. anti-affinity
// to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
@ -223,33 +214,22 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
"tag_1_1", "tag_1_2");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
List<Container> allocated1 = waitForAllocation(1, 3000, am1, nms);
Assert.assertEquals(1, allocated1.size());
allocated.addAll(allocated1);
Assert.assertEquals(3, getContainerNodesNum(allocated));
// App1 should get 4 containers allocated (1 AM + 2 task (first request) +
// 1 task (2nd request).
Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
// app1 asks for 10 anti-affinity containers for the same app. anti-affinity
// app1 asks for 1 anti-affinity containers for the same app. anti-affinity
// to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
"tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get 1 more containers allocated
// 1 AM + 2 task (first request) + 1 task (2nd request) +
// 1 task (3rd request)
Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
allocated1 = waitForAllocation(1, 3000, am1, nms);
Assert.assertEquals(1, allocated1.size());
allocated.addAll(allocated1);
Assert.assertEquals(4, getContainerNodesNum(allocated));
rm1.close();
}
@ -260,12 +240,9 @@ public class TestSchedulingRequestContainerAllocation {
* types, see more in TestPlacementConstraintsUtil.
* @throws Exception
*/
@Test
@Test(timeout = 30000L)
public void testInterAppAntiAffinity() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -296,13 +273,9 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
List<Container> allocated = waitForAllocation(3, 3000, am1, nms);
Assert.assertEquals(3, allocated.size());
Assert.assertEquals(3, getContainerNodesNum(allocated));
System.out.println("Mappers on HOST0: "
+ rmNodes[0].getAllocationTagsWithCount().get("mapper"));
@ -311,11 +284,6 @@ public class TestSchedulingRequestContainerAllocation {
System.out.println("Mappers on HOST2: "
+ rmNodes[2].getAllocationTagsWithCount().get("mapper"));
// App1 should get 4 containers allocated (1 AM + 3 mappers).
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
// app2 -> c
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
@ -330,18 +298,17 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, allNs.toString(),
ImmutableSet.of("foo"), "mapper");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
List<Container> allocated1 = waitForAllocation(3, 3000, am2, nms);
Assert.assertEquals(3, allocated1.size());
Assert.assertEquals(1, getContainerNodesNum(allocated1));
allocated.addAll(allocated1);
Assert.assertEquals(4, getContainerNodesNum(allocated));
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App2 should get 4 containers allocated (1 AM + 3 container).
Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
// The allocated node should not have mapper tag.
Assert.assertTrue(schedulerApp2.getLiveContainers()
.stream().allMatch(rmContainer -> {
@ -365,17 +332,11 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, allNs.toString(),
ImmutableSet.of("mapper"), "mapper");
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 4; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
am3.getApplicationAttemptId());
// App3 should get 2 containers allocated (1 AM + 1 container).
Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
allocated1 = waitForAllocation(1, 3000, am3, nms);
Assert.assertEquals(1, allocated1.size());
allocated.addAll(allocated1);
Assert.assertEquals(4, getContainerNodesNum(allocated));
rm1.close();
}
@ -423,12 +384,9 @@ public class TestSchedulingRequestContainerAllocation {
rm1.close();
}
@Test
@Test(timeout = 30000L)
public void testSchedulingRequestWithNullConstraint() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -467,14 +425,8 @@ public class TestSchedulingRequestContainerAllocation {
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(2, schedApp.getLiveContainers().size());
List<Container> allocated = waitForAllocation(1, 3000, am1, nms);
Assert.assertEquals(1, allocated.size());
// Send another request with null placement constraint,
// ensure there is no NPE while handling this request.
@ -488,22 +440,19 @@ public class TestSchedulingRequestContainerAllocation {
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request1);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
Assert.assertEquals(4, schedApp.getLiveContainers().size());
allocated = waitForAllocation(2, 3000, am1, nms);
Assert.assertEquals(2, allocated.size());
rm1.close();
}
private void doNodeHeartbeat(MockNM... nms) throws Exception {
private static void doNodeHeartbeat(MockNM... nms) throws Exception {
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
}
}
private List<Container> waitForAllocation(int allocNum, int timeout,
public static List<Container> waitForAllocation(int allocNum, int timeout,
MockAM am, MockNM... nms) throws Exception {
final List<Container> result = new ArrayList<>();
GenericTestUtils.waitFor(() -> {
@ -553,7 +502,7 @@ public class TestSchedulingRequestContainerAllocation {
.build();
}
private int getContainerNodesNum(List<Container> containers) {
public static int getContainerNodesNum(List<Container> containers) {
Set<NodeId> nodes = new HashSet<>();
if (containers != null) {
containers.forEach(c -> nodes.add(c.getNodeId()));
@ -566,12 +515,8 @@ public class TestSchedulingRequestContainerAllocation {
// This test both intra and inter app constraints.
// Including simple affinity, anti-affinity, cardinality constraints,
// and simple AND composite constraints.
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
MockRM rm = new MockRM(conf);
try {
rm.start();
@ -698,12 +643,8 @@ public class TestSchedulingRequestContainerAllocation {
@Test(timeout = 30000L)
public void testMultiAllocationTagsConstraints() throws Exception {
// This test simulates to use PC to avoid port conflicts
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
MockRM rm = new MockRM(conf);
try {
rm.start();
@ -779,12 +720,7 @@ public class TestSchedulingRequestContainerAllocation {
public void testInterAppConstraintsWithNamespaces() throws Exception {
// This test verifies inter-app constraints with namespaces
// not-self/app-id/app-tag
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
MockRM rm = new MockRM(conf);
try {
rm.start();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
@ -32,38 +33,59 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.List;
/**
* Test SchedulingRequest With Asynchronous Scheduling.
*/
@RunWith(Parameterized.class)
public class TestSchedulingRequestContainerAllocationAsync {
private final int GB = 1024;
private YarnConfiguration conf;
private String placementConstraintHandler;
RMNodeLabelsManager mgr;
@Parameters
public static Object[] placementConstarintHandlers() {
return new Object[] {
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
}
public TestSchedulingRequestContainerAllocationAsync(
String placementConstraintHandler) {
this.placementConstraintHandler = placementConstraintHandler;
}
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
this.placementConstraintHandler);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
conf);
csConf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 0);
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@ -89,24 +111,17 @@ public class TestSchedulingRequestContainerAllocationAsync {
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
// app1 asks for 10 anti-affinity containers for the same app. It should
// only get 4 containers allocated because we only have 4 nodes.
// app1 asks for 1000 anti-affinity containers for the same app. It should
// only get 200 containers allocated because we only have 200 nodes.
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < nNMs; j++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
}
}
// App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
List<Container> allocated = TestSchedulingRequestContainerAllocation.
waitForAllocation(nNMs, 6000, am1, nms);
Assert.assertEquals(nNMs, allocated.size());
Assert.assertEquals(nNMs, TestSchedulingRequestContainerAllocation.
getContainerNodesNum(allocated));
rm1.close();
}