From 376233cdd4a4ddbde5a92a0627f78338cb4c38b7 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 22 Sep 2014 09:28:47 -0700 Subject: [PATCH] MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN scheduler resource type is memory plus cpu. Contributed by Peng Zhang and Varun Vasudev. --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../mapreduce/v2/app/rm/RMCommunicator.java | 12 +- .../v2/app/rm/RMContainerAllocator.java | 270 +++++++++++------- .../v2/app/rm/ResourceCalculatorUtils.java | 52 ++++ .../v2/app/rm/TestRMContainerAllocator.java | 82 ++++-- .../hadoop/yarn/util/resource/Resources.java | 4 +- 6 files changed, 287 insertions(+), 137 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 815b3fbe4d5..859c8998a66 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -254,6 +254,10 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5891. Improved shuffle error handling across NM restarts (Junping Du via jlowe) + MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN + scheduler resource type is memory plus cpu. (Peng Zhang and Varun Vasudev + via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 6e9f3138b4a..6c58a683d1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; /** * Registers/unregisters to RM and sends heartbeats to RM. @@ -90,6 +92,8 @@ public abstract class RMCommunicator extends AbstractService private volatile boolean shouldUnregister = true; private boolean isApplicationMasterRegistered = false; + private EnumSet schedulerResourceTypes; + public RMCommunicator(ClientService clientService, AppContext context) { super("RMCommunicator"); this.clientService = clientService; @@ -98,6 +102,7 @@ public abstract class RMCommunicator extends AbstractService this.applicationId = context.getApplicationID(); this.stopped = new AtomicBoolean(false); this.heartbeatCallbacks = new ConcurrentLinkedQueue(); + this.schedulerResourceTypes = EnumSet.of(SchedulerResourceTypes.MEMORY); } @Override @@ -163,10 +168,11 @@ public abstract class RMCommunicator extends AbstractService setClientToAMToken(response.getClientToAMTokenMasterKey()); } this.applicationACLs = response.getApplicationACLs(); - LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory()); + LOG.info("maxContainerCapability: " + maxContainerCapability); String queue = response.getQueue(); LOG.info("queue: " + queue); job.setQueueName(queue); + this.schedulerResourceTypes.addAll(response.getSchedulerResourceTypes()); } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -343,4 +349,8 @@ public abstract class RMCommunicator extends AbstractService protected boolean isApplicationMasterRegistered() { return isApplicationMasterRegistered; } + + public EnumSet getSchedulerResourceTypes() { + return schedulerResourceTypes; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 6cb01918180..fb8771af6a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -149,8 +151,8 @@ public class RMContainerAllocator extends RMContainerRequestor private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceRequest;//memory - private int reduceResourceRequest;//memory + private Resource mapResourceRequest = Resources.none(); + private Resource reduceResourceRequest = Resources.none(); private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; @@ -328,49 +330,61 @@ public class RMContainerAllocator extends RMContainerRequestor if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; JobId jobId = getJob().getID(); - int supportedMaxContainerCapability = - getMaxContainerCapability().getMemory(); + Resource supportedMaxContainerCapability = getMaxContainerCapability(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceRequest == 0) { - mapResourceRequest = reqEvent.getCapability().getMemory(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceRequest))); - LOG.info("mapResourceRequest:"+ mapResourceRequest); - if (mapResourceRequest > supportedMaxContainerCapability) { - String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceRequest: " + - mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; + if (mapResourceRequest.equals(Resources.none())) { + mapResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest + .getMemory()))); + LOG.info("mapResourceRequest:" + mapResourceRequest); + if (mapResourceRequest.getMemory() > supportedMaxContainerCapability + .getMemory() + || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability + .getVirtualCores()) { + String diagMsg = + "MAP capability required is more than the supported " + + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + + supportedMaxContainerCapability; LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent( - jobId, diagMsg)); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); } } - //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceRequest); + // set the resources + reqEvent.getCapability().setMemory(mapResourceRequest.getMemory()); + reqEvent.getCapability().setVirtualCores( + mapResourceRequest.getVirtualCores()); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceRequest == 0) { - reduceResourceRequest = reqEvent.getCapability().getMemory(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceRequest))); - LOG.info("reduceResourceRequest:"+ reduceResourceRequest); - if (reduceResourceRequest > supportedMaxContainerCapability) { - String diagMsg = "REDUCE capability required is more than the " + - "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceRequest: " + reduceResourceRequest + - " maxContainerCapability:" + supportedMaxContainerCapability; + if (reduceResourceRequest.equals(Resources.none())) { + reduceResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + reduceResourceRequest.getMemory()))); + LOG.info("reduceResourceRequest:" + reduceResourceRequest); + if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability + .getMemory() + || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability + .getVirtualCores()) { + String diagMsg = + "REDUCE capability required is more than the " + + "supported max container capability in the cluster. Killing the " + + "Job. reduceResourceRequest: " + reduceResourceRequest + + " maxContainerCapability:" + + supportedMaxContainerCapability; LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent( - jobId, diagMsg)); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); } } - //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceRequest); + // set the resources + reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory()); + reqEvent.getCapability().setVirtualCores( + reduceResourceRequest.getVirtualCores()); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -425,34 +439,40 @@ public class RMContainerAllocator extends RMContainerRequestor @Private @VisibleForTesting - synchronized void setReduceResourceRequest(int mem) { - this.reduceResourceRequest = mem; + synchronized void setReduceResourceRequest(Resource res) { + this.reduceResourceRequest = res; } @Private @VisibleForTesting - synchronized void setMapResourceRequest(int mem) { - this.mapResourceRequest = mem; + synchronized void setMapResourceRequest(Resource res) { + this.mapResourceRequest = res; } @Private @VisibleForTesting void preemptReducesIfNeeded() { - if (reduceResourceRequest == 0) { - return; //no reduces + if (reduceResourceRequest.equals(Resources.none())) { + return; // no reduces } //check if reduces have taken over the whole cluster and there are //unassigned maps if (scheduledRequests.maps.size() > 0) { - int memLimit = getMemLimit(); - int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); - //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceRequest) { - //to make sure new containers are given to maps and not reduces - //ramp down all scheduled reduces if any - //(since reduces are scheduled at higher priority than maps) - LOG.info("Ramping down all scheduled reduces:" + scheduledRequests.reduces.size()); + Resource resourceLimit = getResourceLimit(); + Resource availableResourceForMap = + Resources.subtract( + resourceLimit, + Resources.multiply(reduceResourceRequest, + assignedRequests.reduces.size() + - assignedRequests.preemptionWaitingReduces.size())); + // availableMemForMap must be sufficient to run at least 1 map + if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, + mapResourceRequest, getSchedulerResourceTypes()) <= 0) { + // to make sure new containers are given to maps and not reduces + // ramp down all scheduled reduces if any + // (since reduces are scheduled at higher priority than maps) + LOG.info("Ramping down all scheduled reduces:" + + scheduledRequests.reduces.size()); for (ContainerRequest req : scheduledRequests.reduces.values()) { pendingReduces.add(req); } @@ -462,17 +482,25 @@ public class RMContainerAllocator extends RMContainerRequestor //hanging around for a while int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); if (hangingMapRequests > 0) { - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceRequest, - (int) (maxReducePreemptionLimit * memLimit)); + // preempt for making space for at least one map + int preemptionReduceNumForOneMap = + ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForPreemptionLimit = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(resourceLimit, maxReducePreemptionLimit), + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForAllMaps = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(mapResourceRequest, hangingMapRequests), + reduceResourceRequest, getSchedulerResourceTypes()); + int toPreempt = + Math.min(Math.max(preemptionReduceNumForOneMap, + preemptionReduceNumForPreemptionLimit), + preemptionReduceNumForAllMaps); - int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + LOG.info("Going to preempt " + toPreempt + + " due to lack of space for maps"); assignedRequests.preemptReduce(toPreempt); } } @@ -497,7 +525,7 @@ public class RMContainerAllocator extends RMContainerRequestor int totalMaps, int completedMaps, int scheduledMaps, int scheduledReduces, int assignedMaps, int assignedReduces, - int mapResourceReqt, int reduceResourceReqt, + Resource mapResourceReqt, Resource reduceResourceReqt, int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { @@ -505,8 +533,12 @@ public class RMContainerAllocator extends RMContainerRequestor return; } - int headRoom = getAvailableResources() != null ? - getAvailableResources().getMemory() : 0; + // get available resources for this job + Resource headRoom = getAvailableResources(); + if (headRoom == null) { + headRoom = Resources.none(); + } + LOG.info("Recalculating schedule, headroom=" + headRoom); //check for slow start @@ -540,49 +572,60 @@ public class RMContainerAllocator extends RMContainerRequestor completedMapPercent = 1; } - int netScheduledMapMem = - (scheduledMaps + assignedMaps) * mapResourceReqt; + Resource netScheduledMapResource = + Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps)); - int netScheduledReduceMem = - (scheduledReduces + assignedReduces) * reduceResourceReqt; + Resource netScheduledReduceResource = + Resources.multiply(reduceResourceReqt, + (scheduledReduces + assignedReduces)); + + Resource finalMapResourceLimit; + Resource finalReduceResourceLimit; - int finalMapMemLimit = 0; - int finalReduceMemLimit = 0; - // ramp up the reduces based on completed map percentage - int totalMemLimit = getMemLimit(); - int idealReduceMemLimit = - Math.min( - (int)(completedMapPercent * totalMemLimit), - (int) (maxReduceRampupLimit * totalMemLimit)); - int idealMapMemLimit = totalMemLimit - idealReduceMemLimit; + Resource totalResourceLimit = getResourceLimit(); + + Resource idealReduceResourceLimit = + Resources.multiply(totalResourceLimit, + Math.min(completedMapPercent, maxReduceRampupLimit)); + Resource ideaMapResourceLimit = + Resources.subtract(totalResourceLimit, idealReduceResourceLimit); // check if there aren't enough maps scheduled, give the free map capacity - // to reduce - if (idealMapMemLimit > netScheduledMapMem) { - int unusedMapMemLimit = idealMapMemLimit - netScheduledMapMem; - finalReduceMemLimit = idealReduceMemLimit + unusedMapMemLimit; - finalMapMemLimit = totalMemLimit - finalReduceMemLimit; + // to reduce. + // Even when container number equals, there may be unused resources in one + // dimension + if (ResourceCalculatorUtils.computeAvailableContainers(ideaMapResourceLimit, + mapResourceReqt, getSchedulerResourceTypes()) >= (scheduledMaps + assignedMaps)) { + // enough resource given to maps, given the remaining to reduces + Resource unusedMapResourceLimit = + Resources.subtract(ideaMapResourceLimit, netScheduledMapResource); + finalReduceResourceLimit = + Resources.add(idealReduceResourceLimit, unusedMapResourceLimit); + finalMapResourceLimit = + Resources.subtract(totalResourceLimit, finalReduceResourceLimit); } else { - finalMapMemLimit = idealMapMemLimit; - finalReduceMemLimit = idealReduceMemLimit; + finalMapResourceLimit = ideaMapResourceLimit; + finalReduceResourceLimit = idealReduceResourceLimit; } - - LOG.info("completedMapPercent " + completedMapPercent + - " totalMemLimit:" + totalMemLimit + - " finalMapMemLimit:" + finalMapMemLimit + - " finalReduceMemLimit:" + finalReduceMemLimit + - " netScheduledMapMem:" + netScheduledMapMem + - " netScheduledReduceMem:" + netScheduledReduceMem); - - int rampUp = - (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt; - + + LOG.info("completedMapPercent " + completedMapPercent + + " totalResourceLimit:" + totalResourceLimit + + " finalMapResourceLimit:" + finalMapResourceLimit + + " finalReduceResourceLimit:" + finalReduceResourceLimit + + " netScheduledMapResource:" + netScheduledMapResource + + " netScheduledReduceResource:" + netScheduledReduceResource); + + int rampUp = + ResourceCalculatorUtils.computeAvailableContainers(Resources.subtract( + finalReduceResourceLimit, netScheduledReduceResource), + reduceResourceReqt, getSchedulerResourceTypes()); + if (rampUp > 0) { rampUp = Math.min(rampUp, numPendingReduces); LOG.info("Ramping up " + rampUp); rampUpReduces(rampUp); - } else if (rampUp < 0){ + } else if (rampUp < 0) { int rampDown = -1 * rampUp; rampDown = Math.min(rampDown, scheduledReduces); LOG.info("Ramping down " + rampDown); @@ -618,8 +661,10 @@ public class RMContainerAllocator extends RMContainerRequestor @SuppressWarnings("unchecked") private List getResources() throws Exception { - int headRoom = getAvailableResources() != null - ? getAvailableResources().getMemory() : 0;//first time it would be null + // will be null the first time + Resource headRoom = + getAvailableResources() == null ? Resources.none() : + Resources.clone(getAvailableResources()); AllocateResponse response; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS @@ -670,7 +715,9 @@ public class RMContainerAllocator extends RMContainerRequestor throw new YarnRuntimeException(msg); } } - int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; + Resource newHeadRoom = + getAvailableResources() == null ? Resources.none() + : getAvailableResources(); List newContainers = response.getAllocatedContainers(); // Setting NMTokens if (response.getNMTokens() != null) { @@ -694,10 +741,11 @@ public class RMContainerAllocator extends RMContainerRequestor new PreemptionContext(assignedRequests), preemptReq); } - if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { + if (newContainers.size() + finishedContainers.size() > 0 + || !headRoom.equals(newHeadRoom)) { //something changed recalculateReduceSchedule = true; - if (LOG.isDebugEnabled() && headRoom != newHeadRoom) { + if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) { LOG.debug("headroom=" + newHeadRoom); } } @@ -802,10 +850,18 @@ public class RMContainerAllocator extends RMContainerRequestor } @Private - public int getMemLimit() { - int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceRequest + - assignedRequests.reduces.size() * reduceResourceRequest; + public Resource getResourceLimit() { + Resource headRoom = getAvailableResources(); + if (headRoom == null) { + headRoom = Resources.none(); + } + Resource assignedMapResource = + Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); + Resource assignedReduceResource = + Resources.multiply(reduceResourceRequest, + assignedRequests.reduces.size()); + return Resources.add(headRoom, + Resources.add(assignedMapResource, assignedReduceResource)); } @Private @@ -914,10 +970,11 @@ public class RMContainerAllocator extends RMContainerRequestor // a container to be assigned boolean isAssignable = true; Priority priority = allocated.getPriority(); - int allocatedMemory = allocated.getResource().getMemory(); + Resource allocatedResource = allocated.getResource(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceRequest + if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, + mapResourceRequest, getSchedulerResourceTypes()) <= 0 || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " @@ -928,7 +985,8 @@ public class RMContainerAllocator extends RMContainerRequestor } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceRequest + if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, + reduceResourceRequest, getSchedulerResourceTypes()) <= 0 || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java new file mode 100644 index 00000000000..b9bc8b595ec --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.util.Records; + +import java.util.EnumSet; + +public class ResourceCalculatorUtils { + public static int divideAndCeil(int a, int b) { + if (b == 0) { + return 0; + } + return (a + (b - 1)) / b; + } + + public static int computeAvailableContainers(Resource available, + Resource required, EnumSet resourceTypes) { + if (resourceTypes.contains(SchedulerResourceTypes.CPU)) { + return Math.min(available.getMemory() / required.getMemory(), + available.getVirtualCores() / required.getVirtualCores()); + } + return available.getMemory() / required.getMemory(); + } + + public static int divideAndCeilContainers(Resource required, Resource factor, + EnumSet resourceTypes) { + if (resourceTypes.contains(SchedulerResourceTypes.CPU)) { + return Math.max(divideAndCeil(required.getMemory(), factor.getMemory()), + divideAndCeil(required.getVirtualCores(), factor.getVirtualCores())); + } + return divideAndCeil(required.getMemory(), factor.getMemory()); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 4c03b317b19..341e67354a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; @@ -30,19 +31,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -458,8 +454,8 @@ public class TestRMContainerAllocator { 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob, new SystemClock()); - allocator.setMapResourceRequest(1024); - allocator.setReduceResourceRequest(1024); + allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); + allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); RMContainerAllocator.AssignedRequests assignedRequests = allocator.getAssignedRequests(); RMContainerAllocator.ScheduledRequests scheduledRequests = @@ -478,7 +474,7 @@ public class TestRMContainerAllocator { @Test(timeout = 30000) public void testNonAggressivelyPreemptReducers() throws Exception { - LOG.info("Running testPreemptReducers"); + LOG.info("Running testNonAggressivelyPreemptReducers"); final int preemptThreshold = 2; //sec Configuration conf = new Configuration(); @@ -513,8 +509,8 @@ public class TestRMContainerAllocator { clock.setTime(1); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob, clock); - allocator.setMapResourceRequest(1024); - allocator.setReduceResourceRequest(1024); + allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); + allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); RMContainerAllocator.AssignedRequests assignedRequests = allocator.getAssignedRequests(); RMContainerAllocator.ScheduledRequests scheduledRequests = @@ -1774,17 +1770,19 @@ public class TestRMContainerAllocator { int scheduledReduces = 0; int assignedMaps = 2; int assignedReduces = 0; - int mapResourceReqt = 1024; - int reduceResourceReqt = 2*1024; + Resource mapResourceReqt = BuilderUtils.newResource(1024, 1); + Resource reduceResourceReqt = BuilderUtils.newResource(2 * 1024, 1); int numPendingReduces = 4; float maxReduceRampupLimit = 0.5f; float reduceSlowStart = 0.2f; RMContainerAllocator allocator = mock(RMContainerAllocator.class); - doCallRealMethod().when(allocator). - scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), - anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat()); - + doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(), + anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class), + any(Resource.class), anyInt(), anyFloat(), anyFloat()); + doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator) + .getSchedulerResourceTypes(); + // Test slow-start allocator.scheduleReduces( totalMaps, succeededMaps, @@ -1808,6 +1806,7 @@ public class TestRMContainerAllocator { verify(allocator, never()).scheduleAllReduces(); succeededMaps = 3; + doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit(); allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, @@ -1818,7 +1817,8 @@ public class TestRMContainerAllocator { verify(allocator, times(1)).setIsReduceStarted(true); // Test reduce ramp-up - doReturn(100 * 1024).when(allocator).getMemLimit(); + doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator) + .getResourceLimit(); allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, @@ -1831,13 +1831,14 @@ public class TestRMContainerAllocator { // Test reduce ramp-down scheduledReduces = 3; - doReturn(10 * 1024).when(allocator).getMemLimit(); + doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) + .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampDownReduces(anyInt()); @@ -1846,7 +1847,8 @@ public class TestRMContainerAllocator { // should be invoked twice. scheduledMaps = 2; assignedReduces = 2; - doReturn(10 * 1024).when(allocator).getMemLimit(); + doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) + .getResourceLimit(); allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, @@ -1855,6 +1857,30 @@ public class TestRMContainerAllocator { numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(2)).rampDownReduces(anyInt()); + + doReturn( + EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU)) + .when(allocator).getSchedulerResourceTypes(); + + // Test ramp-down when enough memory but not enough cpu resource + scheduledMaps = 10; + assignedReduces = 0; + doReturn(BuilderUtils.newResource(100 * 1024, 5 * 1)).when(allocator) + .getResourceLimit(); + allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps, + scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, + reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, + reduceSlowStart); + verify(allocator, times(3)).rampDownReduces(anyInt()); + + // Test ramp-down when enough cpu but not enough memory resource + doReturn(BuilderUtils.newResource(10 * 1024, 100 * 1)).when(allocator) + .getResourceLimit(); + allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps, + scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, + reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, + reduceSlowStart); + verify(allocator, times(4)).rampDownReduces(anyInt()); } private static class RecalculateContainerAllocator extends MyContainerAllocator { @@ -1868,7 +1894,7 @@ public class TestRMContainerAllocator { @Override public void scheduleReduces(int totalMaps, int completedMaps, int scheduledMaps, int scheduledReduces, int assignedMaps, - int assignedReduces, int mapResourceReqt, int reduceResourceReqt, + int assignedReduces, Resource mapResourceReqt, Resource reduceResourceReqt, int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { recalculatedReduceSchedule = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 077c9628831..a205bd1f574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -18,12 +18,12 @@ package org.apache.hadoop.yarn.util.resource; -import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; -@Private +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) @Unstable public class Resources {