YARN-5350. Distributed Scheduling: Ensure sort order of allocatable nodes returned by the RM is not lost. (asuresh)

This commit is contained in:
Arun Suresh 2016-07-19 23:03:58 -07:00
parent 8f0d3d69d6
commit 8fbe6ece24
2 changed files with 130 additions and 82 deletions

View File

@ -61,6 +61,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -112,7 +113,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
private DistSchedulerParams appParams = new DistSchedulerParams();
private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
new OpportunisticContainerAllocator.ContainerIdCounter();
private Map<String, NodeId> nodeList = new HashMap<>();
private Map<String, NodeId> nodeList = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
// generated locally if required.

View File

@ -61,6 +61,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestLocalScheduler {
@ -70,6 +71,122 @@ public class TestLocalScheduler {
Configuration conf = new Configuration();
LocalScheduler localScheduler = new LocalScheduler();
RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
final AtomicBoolean flipFlag = new AtomicBoolean(false);
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
invocationOnMock) throws Throwable {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
} else {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("d", 4), NodeId.newInstance("c", 3)));
}
}
});
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq =
createResourceRequest(ExecutionType.GUARANTEED, 5, "*");
ResourceRequest opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse =
localScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify equal distribution on hosts a and b
// And None on c and d
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated
allocateResponse = localScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
// Verify New containers are equally distribution on hosts c and d
// And None on a and b
allocs = mapAllocs(allocateResponse, 6);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
// Ensure the LocalScheduler respects the list order..
// The first request should be allocated to "d" since it is ranked higher
// The second request should be allocated to "c" since the ranking is
// flipped on every allocate response.
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
allocateResponse = localScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
}
private void registerAM(LocalScheduler localScheduler, RequestInterceptor
finalReqIntcptr, List<NodeId> nodeList) throws Exception {
DistSchedRegisterResponse distSchedRegisterResponse =
Records.newRecord(DistSchedRegisterResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
distSchedRegisterResponse.setMaxAllocatableCapabilty(
Resource.newInstance(1024, 4));
distSchedRegisterResponse.setMinAllocatableCapabilty(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(nodeList);
Mockito.when(
finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
localScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
}
private RequestInterceptor setup(Configuration conf, LocalScheduler
localScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
Context context = Mockito.mock(Context.class);
@ -104,92 +221,20 @@ public class TestLocalScheduler {
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
localScheduler.setNextInterceptor(finalReqIntcptr);
return finalReqIntcptr;
}
DistSchedRegisterResponse distSchedRegisterResponse =
Records.newRecord(DistSchedRegisterResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
distSchedRegisterResponse.setMaxAllocatableCapabilty(
Resource.newInstance(1024, 4));
distSchedRegisterResponse.setMinAllocatableCapabilty(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
Mockito.when(
finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
localScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
invocationOnMock) throws Throwable {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
}
});
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
guaranteedReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true));
guaranteedReq.setNumContainers(5);
guaranteedReq.setCapability(Resource.newInstance(2048, 2));
guaranteedReq.setRelaxLocality(true);
guaranteedReq.setResourceName("*");
private ResourceRequest createResourceRequest(ExecutionType execType,
int numContainers, String resourceName) {
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
opportunisticReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(4);
ExecutionTypeRequest.newInstance(execType, true));
opportunisticReq.setNumContainers(numContainers);
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
opportunisticReq.setPriority(Priority.newInstance(100));
opportunisticReq.setRelaxLocality(true);
opportunisticReq.setResourceName("*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
// Verify equal distribution on hosts a and b
// And None on c and d
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = Records.newRecord(ResourceRequest.class);
opportunisticReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(6);
opportunisticReq.setCapability(Resource.newInstance(512, 3));
opportunisticReq.setPriority(Priority.newInstance(100));
opportunisticReq.setRelaxLocality(true);
opportunisticReq.setResourceName("*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated
allocateResponse = localScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
// Verify New containers are equally distribution on hosts c and d
// And None on a and b
allocs = mapAllocs(allocateResponse);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
opportunisticReq.setResourceName(resourceName);
return opportunisticReq;
}
private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
@ -202,7 +247,9 @@ public class TestLocalScheduler {
}
private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
allocateResponse) throws Exception {
allocateResponse, int expectedSize) throws Exception {
Assert.assertEquals(expectedSize,
allocateResponse.getAllocatedContainers().size());
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
for (Container c : allocateResponse.getAllocatedContainers()) {
ContainerTokenIdentifier cTokId = BuilderUtils