From 96e3027e46a953ca995e4b44ef50bc2a30c7e838 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Mon, 22 Apr 2019 09:49:03 -0700 Subject: [PATCH] YARN-2889. Limit the number of opportunistic container allocated per AM heartbeat. Contributed by Abhishek Modi. --- .../hadoop/yarn/conf/YarnConfiguration.java | 11 ++ .../src/main/resources/yarn-default.xml | 9 + .../OpportunisticContainerAllocator.java | 70 ++++++- .../TestOpportunisticContainerAllocator.java | 186 ++++++++++++++++++ .../yarn/server/nodemanager/NodeManager.java | 8 +- ...ortunisticContainerAllocatorAMService.java | 7 +- 6 files changed, 283 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 860227e2167..b21d76369e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -402,6 +402,17 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false; + /** + * Maximum number of opportunistic containers to be allocated in + * AM heartbeat. + */ + @Unstable + public static final String + OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = + RM_PREFIX + "opportunistic.max.container-allocation.per.am.heartbeat"; + public static final int + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = -1; + /** Number of nodes to be used by the Opportunistic Container allocator for * dispatching containers during container allocation. */ @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e8659429b65..a00b5d6ba4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3340,6 +3340,15 @@ false + + + Maximum number of opportunistic containers to be allocated per + Application Master heartbeat. + + yarn.resourcemanager.opportunistic.max.container-allocation.per.am.heartbeat + -1 + + Number of nodes to be used by the Opportunistic Container Allocator for diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index b31bd69a9fd..10c24022daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -70,6 +71,8 @@ public class OpportunisticContainerAllocator { private static final int RACK_LOCAL_LOOP = 1; private static final int OFF_SWITCH_LOOP = 2; + private int maxAllocationsPerAMHeartbeat = -1; + /** * This class encapsulates application specific parameters used to build a * Container. @@ -291,6 +294,24 @@ public class OpportunisticContainerAllocator { this.tokenSecretManager = tokenSecretManager; } + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat) { + this.tokenSecretManager = tokenSecretManager; + this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; + } + + @VisibleForTesting + void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { + this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; + } + /** * Allocate OPPORTUNISTIC containers. * @param blackList Resource BlackList Request @@ -316,7 +337,6 @@ public class OpportunisticContainerAllocator { // Add OPPORTUNISTIC requests to the outstanding ones. opportContext.addToOutstandingReqs(oppResourceReqs); - Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); Set allocatedNodes = new HashSet<>(); List allocatedContainers = new ArrayList<>(); @@ -334,9 +354,21 @@ public class OpportunisticContainerAllocator { // might be different than what is requested, which is why // we need the requested capability (key) to match against // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } Map> allocation = allocate( rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList, allocatedNodes); + appSubmitter, nodeBlackList, allocatedNodes, remAllocs); if (allocation.size() > 0) { allocations.add(allocation); continueLoop = true; @@ -356,17 +388,42 @@ public class OpportunisticContainerAllocator { return allocatedContainers; } + private int getTotalAllocations( + List>> allocations) { + int totalAllocs = 0; + for (Map> allocation : allocations) { + for (List allocs : allocation.values()) { + totalAllocs += allocs.size(); + } + } + return totalAllocs; + } + private Map> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName, Set blackList, - Set allocatedNodes) + Set allocatedNodes, int maxAllocations) throws YarnException { Map> containers = new HashMap<>(); for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List allocs : containers.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); + break; + } + } allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), blackList, allocatedNodes, - appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk); + appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, + remainingAllocs); ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " @@ -384,7 +441,7 @@ public class OpportunisticContainerAllocator { Set blacklist, Set allocatedNodes, ApplicationAttemptId id, Map allNodes, String userName, Map> allocations, - EnrichedResourceRequest enrichedAsk) + EnrichedResourceRequest enrichedAsk, int maxAllocations) throws YarnException { if (allNodes.size() == 0) { LOG.info("No nodes currently available to " + @@ -397,6 +454,9 @@ public class OpportunisticContainerAllocator { allocations.get(anyAsk.getCapability()).size()); toAllocate = Math.min(toAllocate, appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } int numAllocated = 0; // Node Candidates are selected as follows: // * Node local candidates selected in loop == 0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 65ad74870f8..57e397d0102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -643,4 +643,190 @@ public class TestOpportunisticContainerAllocator { Assert.assertEquals(1, containers.size()); Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); } + + /** + * Tests maximum number of opportunistic containers that can be allocated in + * AM heartbeat. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeat() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(2); + final Priority priority = Priority.newInstance(1); + final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true); + final Resource resource = Resources.createResource(1 * GB); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(priority, "*", + resource, 3, true, null, oppRequest), + ResourceRequest.newInstance(priority, "h6", + resource, 3, true, null, oppRequest), + ResourceRequest.newInstance(priority, "/r3", + resource, 3, true, null, oppRequest)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1"); + LOG.info("Containers: {}", containers); + // Although capacity is present, but only 2 containers should be allocated + // as max allocation per AM heartbeat is set to 2. + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1"); + LOG.info("Containers: {}", containers); + // Remaining 1 container should be allocated. + Assert.assertEquals(1, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat for + * allocation requests with different scheduler key. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(2); + final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true); + final Resource resource = Resources.createResource(1 * GB); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + resource, 1, true, null, oppRequest), + ResourceRequest.newInstance(Priority.newInstance(2), "h6", + resource, 2, true, null, oppRequest), + ResourceRequest.newInstance(Priority.newInstance(3), "/r3", + resource, 2, true, null, oppRequest)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1"); + LOG.info("Containers: {}", containers); + // Although capacity is present, but only 2 containers should be allocated + // as max allocation per AM heartbeat is set to 2. + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1"); + LOG.info("Containers: {}", containers); + // 2 more containers should be allocated from pending allocation requests. + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1"); + LOG.info("Containers: {}", containers); + // Remaining 1 container should be allocated. + Assert.assertEquals(1, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat when + * limit is set to -1. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(-1); + + Priority priority = Priority.newInstance(1); + Resource capability = Resources.createResource(1 * GB); + List reqs = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(priority) + .resourceName("h1") + .capability(capability) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1"); + + // all containers should be allocated in single heartbeat. + Assert.assertEquals(20, containers.size()); + } + + /** + * Tests maximum opportunistic container allocation per AM heartbeat when + * limit is set to higher value. + * @throws Exception + */ + @Test + public void testMaxAllocationsPerAMHeartbeatWithHighLimit() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(100); + + Priority priority = Priority.newInstance(1); + Resource capability = Resources.createResource(1 * GB); + List reqs = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(priority) + .resourceName("h1") + .capability(capability) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1"); + + // all containers should be allocated in single heartbeat. + Assert.assertEquals(20, containers.size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 1ed1fdaabdb..89e3b478d1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -473,10 +473,14 @@ public class NodeManager extends CompositeService .getContainersMonitor(), this.aclsManager, dirsHandler); addService(webServer); ((NMContext) context).setWebServer(webServer); - + int maxAllocationsPerAMHeartbeat = conf.getInt( + YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, + YarnConfiguration. + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); ((NMContext) context).setQueueableContainerAllocator( new OpportunisticContainerAllocator( - context.getContainerTokenSecretManager())); + context.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat)); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 9e861bd3a39..a360ed2b652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -229,8 +229,13 @@ public class OpportunisticContainerAllocatorAMService YarnScheduler scheduler) { super(OpportunisticContainerAllocatorAMService.class.getName(), rmContext, scheduler); + int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, + YarnConfiguration. + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager()); + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat); this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);