From 283fa33febe043bd7b4fa87546be26c9c5a8f8b5 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 9 Nov 2016 00:11:25 -0800 Subject: [PATCH] YARN-5823. Update NMTokens in case of requests with only opportunistic containers. (Konstantinos Karanasos via asuresh) --- .../TestOpportunisticContainerAllocation.java | 71 ++++++++++++++++++- .../OpportunisticContainerAllocator.java | 55 +++++++------- .../ContainerManagerImpl.java | 2 +- .../scheduler/DistributedScheduler.java | 19 +++-- .../ApplicationMasterService.java | 3 +- ...ortunisticContainerAllocatorAMService.java | 23 +++++- 6 files changed, 137 insertions(+), 36 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index b9b4b02a14b..ace145d7e19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -229,6 +229,9 @@ public class TestOpportunisticContainerAllocation { amClient.registerApplicationMaster("Host", 10000, ""); + testOpportunisticAllocation( + (AMRMClientImpl) amClient); + testAllocation((AMRMClientImpl)amClient); amClient @@ -247,7 +250,6 @@ public class TestOpportunisticContainerAllocation { final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request - assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); @@ -388,6 +390,73 @@ public class TestOpportunisticContainerAllocation { assertEquals(0, amClient.release.size()); } + /** + * Tests allocation with requests comprising only opportunistic containers. + */ + private void testOpportunisticAllocation( + final AMRMClientImpl amClient) + throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(2, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 10; + Set releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + for (Container container : allocResponse.getAllocatedContainers()) { + allocatedContainerCount++; + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(1, receivedNMTokens.values().size()); + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 4410db12c93..16436bdeec6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,12 +156,18 @@ public class OpportunisticContainerAllocator { } } - static class PartitionedResourceRequests { + /** + * Class that includes two lists of {@link ResourceRequest}s: one for + * GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s. + */ + public static class PartitionedResourceRequests { private List guaranteed = new ArrayList<>(); private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { return guaranteed; } + public List getOpportunistic() { return opportunistic; } @@ -186,10 +191,10 @@ public class OpportunisticContainerAllocator { } /** - * Entry point into the Opportunistic Container Allocator. + * Allocate OPPORTUNISTIC containers. * @param request AllocateRequest * @param applicationAttemptId ApplicationAttemptId - * @param appContext App Specific OpportunisticContainerContext + * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier * @param appSubmitter App Submitter * @return List of Containers. @@ -197,37 +202,31 @@ public class OpportunisticContainerAllocator { */ public List allocateContainers( AllocateRequest request, ApplicationAttemptId applicationAttemptId, - OpportunisticContainerContext appContext, long rmIdentifier, + OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAskList()); - - if (partitionedAsks.getOpportunistic().isEmpty()) { - return Collections.emptyList(); - } - + // Update released containers. List releasedContainers = request.getReleaseList(); int numReleasedContainers = releasedContainers.size(); if (numReleasedContainers > 0) { LOG.info("AttemptID: " + applicationAttemptId + " released: " + numReleasedContainers); - appContext.getContainersAllocated().removeAll(releasedContainers); + opportContext.getContainersAllocated().removeAll(releasedContainers); } - // Also, update black list + // Update black list. ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); if (rbr != null) { - appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); } - // Add OPPORTUNISTIC reqs to the outstanding reqs - appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(request.getAskList()); + // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); for (Priority priority : - appContext.getOutstandingOpReqs().descendingKeySet()) { + opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, // Value = List of Containers of given cap (the actual container size @@ -235,16 +234,14 @@ public class OpportunisticContainerAllocator { // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - appContext, priority, applicationAttemptId, appSubmitter); + opportContext, priority, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { - appContext.matchAllocationToOutstandingRequest( + opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); allocatedContainers.addAll(e.getValue()); } } - // Send all the GUARANTEED Reqs to RM - request.setAskList(partitionedAsks.getGuaranteed()); return allocatedContainers; } @@ -359,8 +356,14 @@ public class OpportunisticContainerAllocator { return containerToken; } - private PartitionedResourceRequests partitionAskList(List - askList) { + /** + * Partitions a list of ResourceRequest to two separate lists, one for + * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. + * @param askList the list of ResourceRequests to be partitioned + * @return the partitioned ResourceRequests + */ + public PartitionedResourceRequests partitionAskList( + List askList) { PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 76933ec27f9..ab5827e5ed2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1523,7 +1523,7 @@ public class ContainerManagerImpl extends CompositeService implements @Override public OpportunisticContainersStatus getOpportunisticContainersStatus() { - return null; + return OpportunisticContainersStatus.newInstance(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index a12d16a6317..0f47c9319b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -220,16 +220,27 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( DistributedSchedulingAllocateRequest request) throws YarnException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); - } + + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = containerAllocator + .partitionAskList(request.getAllocateRequest().getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( request.getAllocateRequest(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); + // Prepare request for sending to RM for scheduling GUARANTEED containers. request.setAllocatedContainers(allocatedContainers); + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba23eeb..4f952b7001c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -542,7 +542,8 @@ public class ApplicationMasterService extends AbstractService implements RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); - if (!allocation.getContainers().isEmpty()) { + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { allocateResponse.setNMTokens(allocation.getNMTokens()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index bdd57183fec..50a9c4d89b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -220,34 +220,51 @@ public class OpportunisticContainerAllocatorAMService public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = + oppContainerAllocator.partitionAskList(request.getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.setAskList(partitionedAsks.getOpportunistic()); final ApplicationAttemptId appAttemptId = getAppAttemptId(); SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + List oppContainers = oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); } - // Allocate all guaranteed containers + // Allocate GUARANTEED containers. + request.setAskList(partitionedAsks.getGuaranteed()); AllocateResponse allocateResp = super.allocate(request); + // Add allocated OPPORTUNISTIC containers to the AllocateResponse. + if (!oppContainers.isEmpty()) { + allocateResp.getAllocatedContainers().addAll(oppContainers); + } + + // Update opportunistic container context with the allocated GUARANTEED + // containers. oppCtx.updateCompletedContainers(allocateResp); // Add all opportunistic containers - allocateResp.getAllocatedContainers().addAll(oppContainers); return allocateResp; } @Override public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( + registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException { RegisterApplicationMasterResponse response =