YARN-7299. Fix TestDistributedScheduler. (asuresh)

This commit is contained in:
Arun Suresh 2017-10-27 22:48:29 -07:00
parent 24f8c5cce3
commit 9c5c68745e
1 changed files with 40 additions and 24 deletions

View File

@ -76,7 +76,9 @@ public class TestDistributedScheduler {
registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList( registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"), RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"),
RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"))); RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"),
RemoteNode.newInstance(NodeId.newInstance("c", 3), "http://c:3"),
RemoteNode.newInstance(NodeId.newInstance("d", 4), "http://d:4")));
final AtomicBoolean flipFlag = new AtomicBoolean(true); final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when( Mockito.when(
@ -92,9 +94,17 @@ public class TestDistributedScheduler {
RemoteNode.newInstance( RemoteNode.newInstance(
NodeId.newInstance("c", 3), "http://c:3"), NodeId.newInstance("c", 3), "http://c:3"),
RemoteNode.newInstance( RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4"))); NodeId.newInstance("d", 4), "http://d:4"),
RemoteNode.newInstance(
NodeId.newInstance("e", 5), "http://e:5"),
RemoteNode.newInstance(
NodeId.newInstance("f", 6), "http://f:6")));
} else { } else {
return createAllocateResponse(Arrays.asList( return createAllocateResponse(Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("f", 6), "http://f:6"),
RemoteNode.newInstance(
NodeId.newInstance("e", 5), "http://e:5"),
RemoteNode.newInstance( RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4"), NodeId.newInstance("d", 4), "http://d:4"),
RemoteNode.newInstance( RemoteNode.newInstance(
@ -117,43 +127,41 @@ public class TestDistributedScheduler {
distributedScheduler.allocate(allocateRequest); distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify equal distribution on hosts a and b, and none on c or d // Verify equal distribution on hosts a, b, c and d, and none on e / f
// NOTE: No more than 1 container will be allocated on a node in the
// top k list per allocate call.
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4); Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); Assert.assertEquals(1, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); Assert.assertEquals(1, allocs.get(NodeId.newInstance("b", 2)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("e", 5)));
Assert.assertNull(allocs.get(NodeId.newInstance("f", 6)));
// New Allocate request // New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated // Verify 4 containers were allocated
allocateResponse = distributedScheduler.allocate(allocateRequest); allocateResponse = distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify new containers are equally distribution on hosts c and d, // Verify new containers are equally distribution on hosts c and d,
// and none on a or b // and none on a or b
allocs = mapAllocs(allocateResponse, 6); allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("e", 5)).size());
Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
// Ensure the DistributedScheduler respects the list order.. // Ensure the DistributedScheduler respects the list order..
// The first request should be allocated to "d" since it is ranked higher // The first request should be allocated to "c" since it is ranked higher
// The second request should be allocated to "c" since the ranking is // The second request should be allocated to "f" since the ranking is
// flipped on every allocate response. // flipped on every allocate response.
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
@ -168,7 +176,15 @@ public class TestDistributedScheduler {
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest); allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1); allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); Assert.assertEquals(1, allocs.get(NodeId.newInstance("f", 6)).size());
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
} }
private void registerAM(DistributedScheduler distributedScheduler, private void registerAM(DistributedScheduler distributedScheduler,