From 8fbe6ece24e38ee24fee0abdbed5f7dc5d3c16da Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Tue, 19 Jul 2016 23:03:58 -0700 Subject: [PATCH] YARN-5350. Distributed Scheduling: Ensure sort order of allocatable nodes returned by the RM is not lost. (asuresh) --- .../nodemanager/scheduler/LocalScheduler.java | 3 +- .../scheduler/TestLocalScheduler.java | 209 +++++++++++------- 2 files changed, 130 insertions(+), 82 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 10c13614022..ec0e8a4f68e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -112,7 +113,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor { private DistSchedulerParams appParams = new DistSchedulerParams(); private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter = new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map nodeList = new HashMap<>(); + private Map nodeList = new LinkedHashMap<>(); // Mapping of NodeId to NodeTokens. Populated either from RM response or // generated locally if required. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index 31f8085a5f8..8de849ba930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -61,6 +61,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public class TestLocalScheduler { @@ -70,6 +71,122 @@ public class TestLocalScheduler { Configuration conf = new Configuration(); LocalScheduler localScheduler = new LocalScheduler(); + RequestInterceptor finalReqIntcptr = setup(conf, localScheduler); + + registerAM(localScheduler, finalReqIntcptr, Arrays.asList( + NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); + + final AtomicBoolean flipFlag = new AtomicBoolean(false); + Mockito.when( + finalReqIntcptr.allocateForDistributedScheduling( + Mockito.any(DistSchedAllocateRequest.class))) + .thenAnswer(new Answer() { + @Override + public DistSchedAllocateResponse answer(InvocationOnMock + invocationOnMock) throws Throwable { + flipFlag.set(!flipFlag.get()); + if (flipFlag.get()) { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + } else { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("d", 4), NodeId.newInstance("c", 3))); + } + } + }); + + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + ResourceRequest guaranteedReq = + createResourceRequest(ExecutionType.GUARANTEED, 5, "*"); + + ResourceRequest opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 4 containers were allocated + AllocateResponse allocateResponse = + localScheduler.allocate(allocateRequest); + Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); + + // Verify equal distribution on hosts a and b + // And None on c and d + Map> allocs = mapAllocs(allocateResponse, 4); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); + Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); + + // New Allocate request + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 6 containers were allocated + allocateResponse = localScheduler.allocate(allocateRequest); + Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); + + // Verify New containers are equally distribution on hosts c and d + // And None on a and b + allocs = mapAllocs(allocateResponse, 6); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); + Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); + + // Ensure the LocalScheduler respects the list order.. + // The first request should be allocated to "d" since it is ranked higher + // The second request should be allocated to "c" since the ranking is + // flipped on every allocate response. + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + } + + private void registerAM(LocalScheduler localScheduler, RequestInterceptor + finalReqIntcptr, List nodeList) throws Exception { + DistSchedRegisterResponse distSchedRegisterResponse = + Records.newRecord(DistSchedRegisterResponse.class); + distSchedRegisterResponse.setRegisterResponse( + Records.newRecord(RegisterApplicationMasterResponse.class)); + distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); + distSchedRegisterResponse.setContainerIdStart(0); + distSchedRegisterResponse.setMaxAllocatableCapabilty( + Resource.newInstance(1024, 4)); + distSchedRegisterResponse.setMinAllocatableCapabilty( + Resource.newInstance(512, 2)); + distSchedRegisterResponse.setNodesForScheduling(nodeList); + Mockito.when( + finalReqIntcptr.registerApplicationMasterForDistributedScheduling( + Mockito.any(RegisterApplicationMasterRequest.class))) + .thenReturn(distSchedRegisterResponse); + + localScheduler.registerApplicationMaster( + Records.newRecord(RegisterApplicationMasterRequest.class)); + } + + private RequestInterceptor setup(Configuration conf, LocalScheduler + localScheduler) { NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); Context context = Mockito.mock(Context.class); @@ -104,92 +221,20 @@ public class TestLocalScheduler { RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); localScheduler.setNextInterceptor(finalReqIntcptr); + return finalReqIntcptr; + } - DistSchedRegisterResponse distSchedRegisterResponse = - Records.newRecord(DistSchedRegisterResponse.class); - distSchedRegisterResponse.setRegisterResponse( - Records.newRecord(RegisterApplicationMasterResponse.class)); - distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); - distSchedRegisterResponse.setContainerIdStart(0); - distSchedRegisterResponse.setMaxAllocatableCapabilty( - Resource.newInstance(1024, 4)); - distSchedRegisterResponse.setMinAllocatableCapabilty( - Resource.newInstance(512, 2)); - distSchedRegisterResponse.setNodesForScheduling(Arrays.asList( - NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); - Mockito.when( - finalReqIntcptr.registerApplicationMasterForDistributedScheduling( - Mockito.any(RegisterApplicationMasterRequest.class))) - .thenReturn(distSchedRegisterResponse); - - localScheduler.registerApplicationMaster( - Records.newRecord(RegisterApplicationMasterRequest.class)); - - Mockito.when( - finalReqIntcptr.allocateForDistributedScheduling( - Mockito.any(DistSchedAllocateRequest.class))) - .thenAnswer(new Answer() { - @Override - public DistSchedAllocateResponse answer(InvocationOnMock - invocationOnMock) throws Throwable { - return createAllocateResponse(Arrays.asList( - NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); - } - }); - - AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class); - guaranteedReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); - guaranteedReq.setNumContainers(5); - guaranteedReq.setCapability(Resource.newInstance(2048, 2)); - guaranteedReq.setRelaxLocality(true); - guaranteedReq.setResourceName("*"); + private ResourceRequest createResourceRequest(ExecutionType execType, + int numContainers, String resourceName) { ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); opportunisticReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); - opportunisticReq.setNumContainers(4); + ExecutionTypeRequest.newInstance(execType, true)); + opportunisticReq.setNumContainers(numContainers); opportunisticReq.setCapability(Resource.newInstance(1024, 4)); opportunisticReq.setPriority(Priority.newInstance(100)); opportunisticReq.setRelaxLocality(true); - opportunisticReq.setResourceName("*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - - // Verify 4 containers were allocated - AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest); - Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); - - // Verify equal distribution on hosts a and b - // And None on c and d - Map> allocs = mapAllocs(allocateResponse); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); - Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); - Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); - - // New Allocate request - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); - opportunisticReq.setNumContainers(6); - opportunisticReq.setCapability(Resource.newInstance(512, 3)); - opportunisticReq.setPriority(Priority.newInstance(100)); - opportunisticReq.setRelaxLocality(true); - opportunisticReq.setResourceName("*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - - // Verify 6 containers were allocated - allocateResponse = localScheduler.allocate(allocateRequest); - Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); - - // Verify New containers are equally distribution on hosts c and d - // And None on a and b - allocs = mapAllocs(allocateResponse); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); - Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); - Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); + opportunisticReq.setResourceName(resourceName); + return opportunisticReq; } private DistSchedAllocateResponse createAllocateResponse(List nodes) { @@ -202,7 +247,9 @@ public class TestLocalScheduler { } private Map> mapAllocs(AllocateResponse - allocateResponse) throws Exception { + allocateResponse, int expectedSize) throws Exception { + Assert.assertEquals(expectedSize, + allocateResponse.getAllocatedContainers().size()); Map> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { ContainerTokenIdentifier cTokId = BuilderUtils