From 6b4319cc507dc79b9d70546464ed0f190d303ce6 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Fri, 27 Oct 2017 22:48:29 -0700 Subject: [PATCH] YARN-7299. Fix TestDistributedScheduler. (asuresh) (cherry picked from commit 9c5c68745ed18883ce8bdbdca379025b23f17f60) --- .../scheduler/TestDistributedScheduler.java | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index 736dc3114fa..dee2a205fb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -76,7 +76,9 @@ public class TestDistributedScheduler { registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList( RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"), - RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"))); + RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"), + RemoteNode.newInstance(NodeId.newInstance("c", 3), "http://c:3"), + RemoteNode.newInstance(NodeId.newInstance("d", 4), "http://d:4"))); final AtomicBoolean flipFlag = new AtomicBoolean(true); Mockito.when( @@ -92,9 +94,17 @@ public class TestDistributedScheduler { RemoteNode.newInstance( NodeId.newInstance("c", 3), "http://c:3"), RemoteNode.newInstance( - NodeId.newInstance("d", 4), "http://d:4"))); + NodeId.newInstance("d", 4), "http://d:4"), + RemoteNode.newInstance( + NodeId.newInstance("e", 5), "http://e:5"), + RemoteNode.newInstance( + NodeId.newInstance("f", 6), "http://f:6"))); } else { return createAllocateResponse(Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("f", 6), "http://f:6"), + RemoteNode.newInstance( + NodeId.newInstance("e", 5), "http://e:5"), RemoteNode.newInstance( NodeId.newInstance("d", 4), "http://d:4"), RemoteNode.newInstance( @@ -117,43 +127,41 @@ public class TestDistributedScheduler { distributedScheduler.allocate(allocateRequest); Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); - // Verify equal distribution on hosts a and b, and none on c or d + // Verify equal distribution on hosts a, b, c and d, and none on e / f + // NOTE: No more than 1 container will be allocated on a node in the + // top k list per allocate call. Map> allocs = mapAllocs(allocateResponse, 4); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); - Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); - Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("a", 1)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("b", 2)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("e", 5))); + Assert.assertNull(allocs.get(NodeId.newInstance("f", 6))); // New Allocate request allocateRequest = Records.newRecord(AllocateRequest.class); opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); + createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*"); allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - // Verify 6 containers were allocated + // Verify 4 containers were allocated allocateResponse = distributedScheduler.allocate(allocateRequest); - Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); + Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); // Verify new containers are equally distribution on hosts c and d, // and none on a or b - allocs = mapAllocs(allocateResponse, 6); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); + allocs = mapAllocs(allocateResponse, 4); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("e", 5)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size()); Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); // Ensure the DistributedScheduler respects the list order.. - // The first request should be allocated to "d" since it is ranked higher - // The second request should be allocated to "c" since the ranking is + // The first request should be allocated to "c" since it is ranked higher + // The second request should be allocated to "f" since the ranking is // flipped on every allocate response. - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - allocateResponse = distributedScheduler.allocate(allocateRequest); - allocs = mapAllocs(allocateResponse, 1); - Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); - allocateRequest = Records.newRecord(AllocateRequest.class); opportunisticReq = createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); @@ -168,7 +176,15 @@ public class TestDistributedScheduler { allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); allocateResponse = distributedScheduler.allocate(allocateRequest); allocs = mapAllocs(allocateResponse, 1); - Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = distributedScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); } private void registerAM(DistributedScheduler distributedScheduler,