From b61d12e30044e0d8fc98c0f469d065f01de90d49 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Mon, 2 Oct 2017 18:01:51 -0700 Subject: [PATCH] YARN-7258. Add Node and Rack Hints to Opportunistic Scheduler. (Kartheek Muthyala via asuresh). (cherry picked from commit b733348dde18a242e6c9074c512116a8baf1d281) --- .../yarn/api/records/ResourceRequest.java | 16 + .../api/impl/TestDistributedScheduling.java | 645 ------------------ ...tOpportunisticContainerAllocationE2E.java} | 2 +- .../api/protocolrecords/RemoteNode.java | 35 + .../impl/pb/RemoteNodePBImpl.java | 19 + .../OpportunisticContainerAllocator.java | 337 +++++++-- .../OpportunisticContainerContext.java | 68 +- .../yarn_server_common_service_protos.proto | 1 + .../TestOpportunisticContainerAllocator.java | 599 ++++++++++++++++ ...ortunisticContainerAllocatorAMService.java | 8 +- ...ortunisticContainerAllocatorAMService.java | 2 + 11 files changed, 1000 insertions(+), 732 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/{TestOpportunisticContainerAllocation.java => TestOpportunisticContainerAllocationE2E.java} (99%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 5bedc879ee3..e9be6c3c14b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -222,6 +222,22 @@ public abstract class ResourceRequest implements Comparable { return this; } + /** + * Set the executionTypeRequest of the request with 'ensure + * execution type' flag set to true. + * @see ResourceRequest#setExecutionTypeRequest( + * ExecutionTypeRequest) + * @param executionType executionType of the request. + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder executionType(ExecutionType executionType) { + resourceRequest.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(executionType, true)); + return this; + } + /** * Set the allocationRequestId of the request. * @see ResourceRequest#setAllocationRequestId(long) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java deleted file mode 100644 index e180f6dc29a..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ /dev/null @@ -1,645 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Validates End2End Distributed Scheduling flow which includes the AM - * specifying OPPORTUNISTIC containers in its resource requests, - * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor - * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the OpportunisticContainerAllocatorAMService running on the RM. - */ -public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { - - private static final Log LOG = - LogFactory.getLog(TestDistributedScheduling.class); - - protected MiniYARNCluster cluster; - protected YarnClient rmClient; - protected ApplicationMasterProtocol client; - protected Configuration conf; - protected Configuration yarnConf; - protected ApplicationAttemptId attemptId; - protected ApplicationId appId; - - @Before - public void doBefore() throws Exception { - cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); - - conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration. - OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, - 10); - cluster.init(conf); - cluster.start(); - yarnConf = cluster.getConfig(); - - // the client has to connect to AMRMProxy - yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); - rmClient = YarnClient.createYarnClient(); - rmClient.init(yarnConf); - rmClient.start(); - - // Submit application - attemptId = createApp(rmClient, cluster, conf); - appId = attemptId.getApplicationId(); - client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); - } - - @After - public void doAfter() throws Exception { - if (client != null) { - try { - client.finishApplicationMaster(FinishApplicationMasterRequest - .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); - rmClient.killApplication(attemptId.getApplicationId()); - attemptId = null; - } catch (Exception e) { - } - } - if (rmClient != null) { - try { - rmClient.stop(); - } catch (Exception e) { - } - } - if (cluster != null) { - try { - cluster.stop(); - } catch (Exception e) { - } - } - } - - - /** - * Validates if Allocate Requests containing only OPPORTUNISTIC container - * requests are satisfied instantly. - * - * @throws Exception - */ - @Test(timeout = 60000) - public void testOpportunisticExecutionTypeRequestE2E() throws Exception { - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - // Wait until the RM has been updated and verify - Map rmApps = - cluster.getResourceManager().getRMContext().getRMApps(); - boolean rmUpdated = false; - for (int i=0; i<10 && !rmUpdated; i++) { - sleep(100); - RMApp rmApp = rmApps.get(appId); - if (rmApp.getState() == RMAppState.RUNNING) { - rmUpdated = true; - } - } - RMApp rmApp = rmApps.get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - - // Replace 'ANY' requests with OPPORTUNISTIC aks and remove - // everything else - List newAskList = new ArrayList<>(); - for (ResourceRequest rr : request.getAskList()) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)); - newAskList.add(newRR); - } - } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); - } - - // Check that the RM sees OPPORTUNISTIC containers - ResourceScheduler scheduler = cluster.getResourceManager() - .getResourceScheduler(); - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerId containerId = allocatedContainer.getId(); - RMContainer rmContainer = scheduler.getRMContainer(containerId); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - rmContainer.getExecutionType()); - } - - LOG.info("testDistributedSchedulingE2E - Finish"); - } - - /** - * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC - * container requests works as expected. - * - * @throws Exception - */ - @Test(timeout = 60000) - public void testMixedExecutionTypeRequestE2E() throws Exception { - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - RMApp rmApp = - cluster.getResourceManager().getRMContext().getRMApps().get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - List askList = request.getAskList(); - List newAskList = new ArrayList<>(askList); - - // Duplicate all ANY requests marking them as opportunistic - for (ResourceRequest rr : askList) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)); - newAskList.add(newRR); - } - } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); - } - - request.setAskList(new ArrayList()); - request.setResponseId(request.getResponseId() + 1); - - Thread.sleep(1000); - - // RM should allocate GUARANTEED containers within 2 calls to allocate() - allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are GUARANTEED - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.GUARANTEED, - containerTokenIdentifier.getExecutionType()); - } - - LOG.info("testDistributedSchedulingE2E - Finish"); - } - - /** - * Validates if AMRMClient can be used with Distributed Scheduling turned on. - * - * @throws Exception - */ - @Test(timeout = 120000) - @SuppressWarnings("unchecked") - public void testAMRMClient() throws Exception { - AMRMClientImpl amClient = null; - try { - Priority priority = Priority.newInstance(1); - Priority priority2 = Priority.newInstance(2); - Resource capability = Resource.newInstance(1024, 1); - - List nodeReports = rmClient.getNodeReports(NodeState.RUNNING); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - String[] nodes = new String[]{node}; - String[] racks = new String[]{rack}; - - // start am rm client - amClient = new AMRMClientImpl(client); - amClient.init(yarnConf); - amClient.start(); - amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, ""); - - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - RemoteRequestsTable remoteRequestsTable = - amClient.getTable(0); - int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest - .getNumContainers(); - int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest - .getNumContainers(); - int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) - .remoteRequest.getNumContainers(); - int oppContainersRequestedAny = - remoteRequestsTable.get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest - .getNumContainers(); - - assertEquals(2, containersRequestedNode); - assertEquals(2, containersRequestedRack); - assertEquals(2, containersRequestedAny); - assertEquals(1, oppContainersRequestedAny); - - assertEquals(4, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - int iterationsLeft = 10; - Set releases = new TreeSet<>(); - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap receivedNMTokens = new HashMap<>(); - - while (allocatedContainerCount < - (containersRequestedAny + oppContainersRequestedAny) - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - allocatedContainerCount += allocResponse.getAllocatedContainers() - .size(); - for (Container container : allocResponse.getAllocatedContainers()) { - ContainerId rejectContainerId = container.getId(); - releases.add(rejectContainerId); - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < containersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(allocatedContainerCount, - containersRequestedAny + oppContainersRequestedAny); - for (ContainerId rejectContainerId : releases) { - amClient.releaseAssignedContainer(rejectContainerId); - } - assertEquals(3, amClient.release.size()); - assertEquals(0, amClient.ask.size()); - - // need to tell the AMRMClient that we dont need these resources anymore - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - assertEquals(4, amClient.ask.size()); - - // test RPC exception handling - amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, - nodes, racks, priority)); - amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, - nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - final AMRMClient amc = amClient; - ApplicationMasterProtocol realRM = amClient.rmClient; - try { - ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol - .class); - when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer( - new Answer() { - public AllocateResponse answer(InvocationOnMock invocation) - throws Exception { - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, - racks, priority)); - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, - priority)); - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, - priority2, 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - throw new Exception(); - } - }); - amClient.rmClient = mockRM; - amClient.allocate(0.1f); - } catch (Exception ioe) { - } finally { - amClient.rmClient = realRM; - } - - assertEquals(3, amClient.release.size()); - assertEquals(6, amClient.ask.size()); - - iterationsLeft = 3; - // do a few iterations to ensure RM is not going send new containers - while (iterationsLeft-- > 0) { - // inform RM of rejection - AllocateResponse allocResponse = amClient.allocate(0.1f); - // RM did not send new containers because AM does not need any - assertEquals(0, allocResponse.getAllocatedContainers().size()); - if (allocResponse.getCompletedContainersStatuses().size() > 0) { - for (ContainerStatus cStatus : allocResponse - .getCompletedContainersStatuses()) { - if (releases.contains(cStatus.getContainerId())) { - assertEquals(cStatus.getState(), ContainerState.COMPLETE); - assertEquals(-100, cStatus.getExitStatus()); - releases.remove(cStatus.getContainerId()); - } - } - } - if (iterationsLeft > 0) { - // sleep to make sure NM's heartbeat - sleep(100); - } - } - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); - - } finally { - if (amClient != null && amClient.getServiceState() == Service.STATE - .STARTED) { - amClient.stop(); - } - } - } - - /** - * Check if an AM can ask for opportunistic containers and get them. - * @throws Exception - */ - @Test - public void testAMOpportunistic() throws Exception { - // Basic container to request - Resource capability = Resource.newInstance(1024, 1); - Priority priority = Priority.newInstance(1); - - // Get the cluster topology - List nodeReports = rmClient.getNodeReports(NodeState.RUNNING); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - String[] nodes = new String[]{node}; - String[] racks = new String[]{rack}; - - // Create an AM to request resources - AMRMClient amClient = null; - try { - amClient = new AMRMClientImpl(client); - amClient.init(yarnConf); - amClient.start(); - amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, ""); - - // AM requests an opportunistic container - ExecutionTypeRequest execTypeRequest = - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); - ContainerRequest containerRequest = new AMRMClient.ContainerRequest( - capability, nodes, racks, priority, 0, true, null, execTypeRequest); - amClient.addContainerRequest(containerRequest); - - // Wait until the container is allocated - ContainerId opportunisticContainerId = null; - for (int i=0; i<10 && opportunisticContainerId == null; i++) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - List allocatedContainers = - allocResponse.getAllocatedContainers(); - for (Container allocatedContainer : allocatedContainers) { - // Check that this is the container we required - assertEquals(ExecutionType.OPPORTUNISTIC, - allocatedContainer.getExecutionType()); - opportunisticContainerId = allocatedContainer.getId(); - } - sleep(100); - } - assertNotNull(opportunisticContainerId); - - // The RM sees the container as OPPORTUNISTIC - ResourceScheduler scheduler = cluster.getResourceManager() - .getResourceScheduler(); - RMContainer rmContainer = scheduler.getRMContainer( - opportunisticContainerId); - assertEquals(ExecutionType.OPPORTUNISTIC, - rmContainer.getExecutionType()); - - // Release the opportunistic container - amClient.releaseAssignedContainer(opportunisticContainerId); - // Wait for the release container to appear - boolean released = false; - for (int i=0; i<10 && !released; i++) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - List completedContainers = - allocResponse.getCompletedContainersStatuses(); - for (ContainerStatus completedContainer : completedContainers) { - ContainerId completedContainerId = - completedContainer.getContainerId(); - assertEquals(completedContainerId, opportunisticContainerId); - released = true; - } - if (!released) { - sleep(100); - } - } - assertTrue(released); - - // The RM shouldn't see the container anymore - rmContainer = scheduler.getRMContainer(opportunisticContainerId); - assertNull(rmContainer); - - // Clean the AM - amClient.unregisterApplicationMaster( - FinalApplicationStatus.SUCCEEDED, null, null); - } finally { - if (amClient != null && - amClient.getServiceState() == Service.STATE.STARTED) { - amClient.close(); - } - } - } - - private void sleep(int sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java index 305d18b6525..c6af487f0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java @@ -88,7 +88,7 @@ import static org.junit.Assert.assertNotNull; * Class that tests the allocation of OPPORTUNISTIC containers through the * centralized ResourceManager. */ -public class TestOpportunisticContainerAllocation { +public class TestOpportunisticContainerAllocationE2E { private static Configuration conf = null; private static MiniYARNCluster yarnCluster = null; private static YarnClient yarnClient = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index e403a12388b..f621aa209a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -46,6 +46,24 @@ public abstract class RemoteNode implements Comparable { return remoteNode; } + /** + * Create new Instance. + * @param nodeId NodeId. + * @param httpAddress Http address. + * @param rackName Rack Name. + * @return RemoteNode instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress, + String rackName) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + remoteNode.setRackName(rackName); + return remoteNode; + } + /** * Get {@link NodeId}. * @return NodeId. @@ -78,6 +96,22 @@ public abstract class RemoteNode implements Comparable { @Unstable public abstract void setHttpAddress(String httpAddress); + /** + * Get Rack Name. + * @return Rack Name. + */ + @Private + @Unstable + public abstract String getRackName(); + + /** + * Set Rack Name. + * @param rackName Rack Name. + */ + @Private + @Unstable + public abstract void setRackName(String rackName); + /** * Use the underlying {@link NodeId} comparator. * @param other RemoteNode. @@ -92,6 +126,7 @@ public abstract class RemoteNode implements Comparable { public String toString() { return "RemoteNode{" + "nodeId=" + getNodeId() + ", " + + "rackName=" + getRackName() + ", " + "httpAddress=" + getHttpAddress() + "}"; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java index 3e4fd4ac5c3..c2492cf4663 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -117,6 +117,25 @@ public class RemoteNodePBImpl extends RemoteNode { builder.setHttpAddress(httpAddress); } + @Override + public String getRackName() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRackName()) { + return null; + } + return (p.getRackName()); + } + + @Override + public void setRackName(String rackName) { + maybeInitBuilder(); + if (rackName == null) { + builder.clearRackName(); + return; + } + builder.setRackName(rackName); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 782dc029b79..ede4958a34b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -45,11 +45,14 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -61,6 +64,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class OpportunisticContainerAllocator { + private static final int NODE_LOCAL_LOOP = 0; + private static final int RACK_LOCAL_LOOP = 1; + private static final int OFF_SWITCH_LOOP = 2; + /** * This class encapsulates application specific parameters used to build a * Container. @@ -70,6 +77,7 @@ public class OpportunisticContainerAllocator { private Resource minResource; private Resource incrementResource; private int containerTokenExpiryInterval; + private int maxAllocationsPerSchedulerKeyPerRound = 1; /** * Return Max Resource. @@ -135,6 +143,24 @@ public class OpportunisticContainerAllocator { int containerTokenExpiryInterval) { this.containerTokenExpiryInterval = containerTokenExpiryInterval; } + + /** + * Get the Max Allocations per Scheduler Key per allocation round. + * @return maxAllocationsPerSchedulerKeyPerRound. + */ + public int getMaxAllocationsPerSchedulerKeyPerRound() { + return maxAllocationsPerSchedulerKeyPerRound; + } + + /** + * Set the Max Allocations per Scheduler Key per allocation round. + * @param maxAllocationsPerSchedulerKeyPerRound val. + */ + public void setMaxAllocationsPerSchedulerKeyPerRound( + int maxAllocationsPerSchedulerKeyPerRound) { + this.maxAllocationsPerSchedulerKeyPerRound = + maxAllocationsPerSchedulerKeyPerRound; + } } /** @@ -188,6 +214,72 @@ public class OpportunisticContainerAllocator { private final BaseContainerTokenSecretManager tokenSecretManager; + static class Allocation { + private final Container container; + private final String resourceName; + + Allocation(Container container, String resourceName) { + this.container = container; + this.resourceName = resourceName; + } + + Container getContainer() { + return container; + } + + String getResourceName() { + return resourceName; + } + } + + static class EnrichedResourceRequest { + private final Map nodeLocations = new HashMap<>(); + private final Map rackLocations = new HashMap<>(); + private final ResourceRequest request; + + EnrichedResourceRequest(ResourceRequest request) { + this.request = request; + } + + ResourceRequest getRequest() { + return request; + } + + void addLocation(String location, int count) { + Map m = rackLocations; + if (!location.startsWith("/")) { + m = nodeLocations; + } + if (count == 0) { + m.remove(location); + } else { + m.put(location, new AtomicInteger(count)); + } + } + + void removeLocation(String location) { + Map m = rackLocations; + AtomicInteger count = m.get(location); + if (count == null) { + m = nodeLocations; + count = m.get(location); + } + + if (count != null) { + if (count.decrementAndGet() == 0) { + m.remove(location); + } + } + } + + Set getNodeLocations() { + return nodeLocations.keySet(); + } + + Set getRackLocations() { + return rackLocations.keySet(); + } + } /** * Create a new Opportunistic Container Allocator. * @param tokenSecretManager TokenSecretManager @@ -223,37 +315,55 @@ public class OpportunisticContainerAllocator { // Add OPPORTUNISTIC requests to the outstanding ones. opportContext.addToOutstandingReqs(oppResourceReqs); - // Satisfy the outstanding OPPORTUNISTIC requests. + Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); List allocatedContainers = new ArrayList<>(); - for (SchedulerRequestKey schedulerKey : - opportContext.getOutstandingOpReqs().descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given cap (the actual container size - // might be different than what is requested, which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map> allocated = allocate(rmIdentifier, - opportContext, schedulerKey, applicationAttemptId, appSubmitter); - for (Map.Entry> e : allocated.entrySet()) { - opportContext.matchAllocationToOutstandingRequest( - e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + for (Map> allocation : allocations) { + for (Map.Entry> e : allocation.entrySet()) { + opportContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); + } + } } } return allocatedContainers; } - private Map> allocate(long rmIdentifier, + private Map> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName) throws YarnException { - Map> containers = new HashMap<>(); - for (ResourceRequest anyAsk : + ApplicationAttemptId appAttId, String userName, Set blackList) + throws YarnException { + Map> containers = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), appContext.getBlacklist(), - appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + appContext.getContainerIdGenerator(), blackList, appAttId, + appContext.getNodeMap(), userName, containers, enrichedAsk); + ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " + "allocationRequestId={}, num_containers={}, capability={}] " @@ -269,43 +379,162 @@ public class OpportunisticContainerAllocator { AllocationParams appParams, ContainerIdGenerator idCounter, Set blacklist, ApplicationAttemptId id, Map allNodes, String userName, - Map> containers, ResourceRequest anyAsk) + Map> allocations, + EnrichedResourceRequest enrichedAsk) throws YarnException { - int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; - } - nodesForScheduling.add(nodeEntry.getValue()); - } - if (nodesForScheduling.isEmpty()) { - LOG.warn("No nodes available for allocating opportunistic containers. [" + - "allNodes={}, blacklist={}]", allNodes, blacklist); + if (allNodes.size() == 0) { + LOG.info("No nodes currently available to " + + "allocate OPPORTUNISTIC containers."); return; } + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int toAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - RemoteNode node = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(rmIdentifier, appParams, idCounter, - anyAsk, id, userName, node); - List cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); - } - cList.add(container); - numAllocated++; - LOG.info("Allocated [{}] as opportunistic.", container.getId()); + // Node Candidates are selected as follows: + // * Node local candidates selected in loop == 0 + // * Rack local candidates selected in loop == 1 + // * From loop == 2 onwards, we revert to off switch allocations. + int loopIndex = OFF_SWITCH_LOOP; + if (enrichedAsk.getNodeLocations().size() > 0) { + loopIndex = NODE_LOCAL_LOOP; } - LOG.info("Allocated {} opportunistic containers.", numAllocated); + while (numAllocated < toAllocate) { + Collection nodeCandidates = + findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk); + for (RemoteNode rNode : nodeCandidates) { + String rNodeHost = rNode.getNodeId().getHost(); + // Ignore black list + if (blacklist.contains(rNodeHost)) { + LOG.info("Nodes for scheduling has a blacklisted node" + + " [" + rNodeHost + "].."); + continue; + } + String location = ResourceRequest.ANY; + if (loopIndex == NODE_LOCAL_LOOP) { + if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { + location = rNodeHost; + } else { + continue; + } + } + if (loopIndex == RACK_LOCAL_LOOP) { + if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { + location = rNode.getRackName(); + } else { + continue; + } + } + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, location, + anyAsk, rNode); + numAllocated++; + // Try to spread the allocations across the nodes. + // But don't add if it is a node local request. + if (loopIndex != NODE_LOCAL_LOOP) { + blacklist.add(rNode.getNodeId().getHost()); + } + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + location + "]"); + if (numAllocated >= toAllocate) { + break; + } + } + if (loopIndex == NODE_LOCAL_LOOP && + enrichedAsk.getRackLocations().size() > 0) { + loopIndex = RACK_LOCAL_LOOP; + } else { + loopIndex++; + } + // Handle case where there are no nodes remaining after blacklist is + // considered. + if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { + LOG.warn("Unable to allocate any opportunistic containers."); + break; + } + } + } + + private Collection findNodeCandidates(int loopIndex, + Map allNodes, Set blackList, + EnrichedResourceRequest enrichedRR) { + if (loopIndex > 1) { + return allNodes.values(); + } else { + LinkedList retList = new LinkedList<>(); + int numContainers = enrichedRR.getRequest().getNumContainers(); + while (numContainers > 0) { + if (loopIndex == 0) { + // Node local candidates + numContainers = collectNodeLocalCandidates( + allNodes, enrichedRR, retList, numContainers); + } else { + // Rack local candidates + numContainers = collectRackLocalCandidates( + allNodes, enrichedRR, retList, blackList, numContainers); + } + if (numContainers == enrichedRR.getRequest().getNumContainers()) { + // If there is no change in numContainers, then there is no point + // in looping again. + break; + } + } + return retList; + } + } + + private int collectRackLocalCandidates(Map allNodes, + EnrichedResourceRequest enrichedRR, LinkedList retList, + Set blackList, int numContainers) { + for (RemoteNode rNode : allNodes.values()) { + if (enrichedRR.getRackLocations().contains(rNode.getRackName())) { + if (blackList.contains(rNode.getNodeId().getHost())) { + retList.addLast(rNode); + } else { + retList.addFirst(rNode); + numContainers--; + } + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private int collectNodeLocalCandidates(Map allNodes, + EnrichedResourceRequest enrichedRR, List retList, + int numContainers) { + for (String nodeName : enrichedRR.getNodeLocations()) { + RemoteNode remoteNode = allNodes.get(nodeName); + if (remoteNode != null) { + retList.add(remoteNode); + numContainers--; + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private Container createContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ApplicationAttemptId id, String userName, + Map> allocations, String location, + ResourceRequest anyAsk, RemoteNode rNode) throws YarnException { + Container container = buildContainer(rmIdentifier, appParams, + idCounter, anyAsk, id, userName, rNode); + List allocList = allocations.get(anyAsk.getCapability()); + if (allocList == null) { + allocList = new ArrayList<>(); + allocations.put(anyAsk.getCapability(), allocList); + } + allocList.add(new Allocation(container, location)); + return container; } private Container buildContainer(long rmIdentifier, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 1b1c5b9c86a..246d450668d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; @@ -35,8 +34,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest; /** * This encapsulates application specific information used by the @@ -53,7 +54,8 @@ public class OpportunisticContainerContext { new ContainerIdGenerator(); private volatile List nodeList = new LinkedList<>(); - private final Map nodeMap = new LinkedHashMap<>(); + private final LinkedHashMap nodeMap = + new LinkedHashMap<>(); private final Set blacklist = new HashSet<>(); @@ -61,7 +63,8 @@ public class OpportunisticContainerContext { // Resource Name (host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC // ResourceRequest (ask). - private final TreeMap> + private final TreeMap + > outstandingOpReqs = new TreeMap<>(); public AllocationParams getAppParams() { @@ -107,7 +110,7 @@ public class OpportunisticContainerContext { return blacklist; } - public TreeMap> + public TreeMap> getOutstandingOpReqs() { return outstandingOpReqs; } @@ -125,36 +128,32 @@ public class OpportunisticContainerContext { for (ResourceRequest request : resourceAsks) { SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map reqMap = + Map reqMap = outstandingOpReqs.get(schedulerKey); if (reqMap == null) { reqMap = new HashMap<>(); outstandingOpReqs.put(schedulerKey, reqMap); } - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); + EnrichedResourceRequest eReq = reqMap.get(request.getCapability()); + if (eReq == null) { + eReq = new EnrichedResourceRequest(request); + reqMap.put(request.getCapability(), eReq); + } + // Set numContainers only for ANY request + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + eReq.getRequest().setResourceName(ResourceRequest.ANY); + eReq.getRequest().setNumContainers(request.getNumContainers()); } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); + eReq.addLocation(request.getResourceName(), request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { LOG.info("# of outstandingOpReqs in ANY (at " + "priority = " + schedulerKey.getPriority() + ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); + + ", with location = " + request.getResourceName() + " ) : " + + ", numContainers = " + eReq.getRequest().getNumContainers()); } } } @@ -163,25 +162,34 @@ public class OpportunisticContainerContext { * This method matches a returned list of Container Allocations to any * outstanding OPPORTUNISTIC ResourceRequest. * @param capability Capability - * @param allocatedContainers Allocated Containers + * @param allocations Allocations. */ public void matchAllocationToOutstandingRequest(Resource capability, - List allocatedContainers) { - for (Container c : allocatedContainers) { + List allocations) { + for (OpportunisticContainerAllocator.Allocation allocation : allocations) { SchedulerRequestKey schedulerKey = - SchedulerRequestKey.extractFrom(c); - Map asks = + SchedulerRequestKey.extractFrom(allocation.getContainer()); + Map asks = outstandingOpReqs.get(schedulerKey); if (asks == null) { continue; } - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { + EnrichedResourceRequest err = asks.get(capability); + if (err != null) { + int numContainers = err.getRequest().getNumContainers(); + numContainers--; + err.getRequest().setNumContainers(numContainers); + if (numContainers == 0) { asks.remove(capability); + if (asks.size() == 0) { + outstandingOpReqs.remove(schedulerKey); + } + } else { + if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) { + err.removeLocation(allocation.getResourceName()); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index e889cdec82c..8e59f141be8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,6 +30,7 @@ import "yarn_service_protos.proto"; message RemoteNodeProto { optional NodeIdProto node_id = 1; optional string http_address = 2; + optional string rack_name = 3; } message RegisterDistributedSchedulingAMResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java new file mode 100644 index 00000000000..788b0b386a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestOpportunisticContainerAllocator { + + private static final int GB = 1024; + private OpportunisticContainerAllocator allocator = null; + private OpportunisticContainerContext oppCntxt = null; + + @Before + public void setup() { + SecurityUtil.setTokenServiceUseIp(false); + final MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + BaseContainerTokenSecretManager secMan = + new BaseContainerTokenSecretManager(new Configuration()) { + @Override + public MasterKey getCurrentKey() { + return mKey; + } + + @Override + public byte[] createPassword(ContainerTokenIdentifier identifier) { + return new byte[]{1, 2}; + } + }; + allocator = new OpportunisticContainerAllocator(secMan); + oppCntxt = new OpportunisticContainerContext(); + oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1)); + oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1)); + oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10)); + } + + @Test + public void testSimpleAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"))); + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(1, containers.size()); + Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } + + @Test + public void testBlacklistRejection() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + Arrays.asList("h1", "h2"), new ArrayList<>()); + List reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r2"))); + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(0, containers.size()); + Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + } + + @Test + public void testRoundRobinSimpleAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(3) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h3:1234")); + Assert.assertEquals(3, containers.size()); + } + + @Test + public void testNodeLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertEquals(2, containers.size()); + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertFalse(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + } + + @Test + public void testNodeLocalAllocationSameSchedKey() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertEquals(2, containers.size()); + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertFalse(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + } + + @Test + public void testSimpleRackLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(1, containers.size()); + } + + @Test + public void testRoundRobinRackLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + System.out.println(containers); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h5:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + System.out.println(containers); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h5:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h6", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r3", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testLotsOfContainersRackLocalAllocationSameSchedKey() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = new ArrayList<>(); + for (int i = 0; i < 250; i++) { + containers.addAll(allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser")); + } + Assert.assertEquals(1000, containers.size()); + } + + @Test + public void testLotsOfContainersRackLocalAllocation() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("*") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = new ArrayList<>(); + for (int i = 0; i < 25; i++) { + containers.addAll(allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser")); + } + Assert.assertEquals(100, containers.size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 4fc2916fe0c..98944af6e92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -432,8 +432,12 @@ public class OpportunisticContainerAllocatorAMService private RemoteNode convertToRemoteNode(NodeId nodeId) { SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId); - return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress()) - : null; + if (node != null) { + RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress()); + rNode.setRackName(node.getRackName()); + return rNode; + } + return null; } private static ApplicationAttemptId getAppAttemptId() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 9b9eb3c3876..1af930f7fc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -610,6 +610,8 @@ public class TestOpportunisticContainerAllocatorAMService { .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); List allocatedContainers = allocateResponse.getAllocatedContainers(); + allocatedContainers.addAll( + am1.allocate(null, null).getAllocatedContainers()); Assert.assertEquals(2, allocatedContainers.size()); Container container = allocatedContainers.get(0); // Start Container in NM