diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 751b083319c..f8c4af37dcc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -117,6 +117,9 @@ Release 2.5.0 - UNRELEASED YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha) + YARN-1913. With Fair Scheduler, cluster can logjam when all resources are + consumed by AMs (Wei Yan via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e04762d9..cce2e46cadd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -76,6 +77,8 @@ public class SchedulerApplicationAttempt { protected final Resource currentReservation = Resource.newInstance(0, 0); private Resource resourceLimit = Resource.newInstance(0, 0); protected Resource currentConsumption = Resource.newInstance(0, 0); + private Resource amResource; + private boolean unmanagedAM = true; protected List newlyAllocatedContainers = new ArrayList(); @@ -106,6 +109,19 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager); this.queue = queue; + + + if (rmContext != null && rmContext.getRMApps() != null && + rmContext.getRMApps() + .containsKey(applicationAttemptId.getApplicationId())) { + ApplicationSubmissionContext appSubmissionContext = + rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) + .getApplicationSubmissionContext(); + if (appSubmissionContext != null) { + amResource = appSubmissionContext.getResource(); + unmanagedAM = appSubmissionContext.getUnmanagedAM(); + } + } } /** @@ -168,6 +184,14 @@ public String getQueueName() { return appSchedulingInfo.getQueueName(); } + public Resource getAMResource() { + return amResource; + } + + public boolean getUnmanagedAM() { + return unmanagedAM; + } + public synchronized RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0f9d9069204..237cad29c19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -53,6 +53,10 @@ public class AllocationConfiguration { private final int userMaxAppsDefault; private final int queueMaxAppsDefault; + // Maximum resource share for each leaf queue that can be used to run AMs + final Map queueMaxAMShares; + private final float queueMaxAMShareDefault; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -84,8 +88,9 @@ public class AllocationConfiguration { public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, + Map queueWeights, + Map queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, float queueMaxAMShareDefault, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, @@ -97,9 +102,11 @@ public AllocationConfiguration(Map minQueueResources, this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; + this.queueMaxAMShares = queueMaxAMShares; this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; @@ -116,8 +123,10 @@ public AllocationConfiguration(Configuration conf) { queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); + queueMaxAMShares = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAMShareDefault = 1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; @@ -184,6 +193,11 @@ public int getQueueMaxApps(String queue) { return (maxApps == null) ? queueMaxAppsDefault : maxApps; } + public float getQueueMaxAMShare(String queue) { + Float maxAMShare = queueMaxAMShares.get(queue); + return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; + } + /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 6c356308918..3a962a8ce52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -209,6 +209,7 @@ public synchronized void reloadAllocations() throws IOException, Map maxQueueResources = new HashMap(); Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); + Map queueMaxAMShares = new HashMap(); Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); @@ -216,6 +217,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + float queueMaxAMShareDefault = 1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; @@ -282,6 +284,11 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShareDefault = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -306,8 +313,8 @@ public synchronized void reloadAllocations() throws IOException, parent = null; } loadQueue(parent, element, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, configuredQueues); } @@ -322,8 +329,8 @@ public synchronized void reloadAllocations() throws IOException, } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, + queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, newPlacementPolicy, configuredQueues); @@ -338,7 +345,8 @@ public synchronized void reloadAllocations() throws IOException, */ private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, + Map userMaxApps, Map queueMaxAMShares, + Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map> queueAcls, @@ -370,6 +378,11 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShares.put(queueName, val); } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); @@ -392,8 +405,9 @@ private void loadQueue(String parentName, Element element, Map } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, configuredQueues); + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 4dc0bf4ceb8..32edc8a2641 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -267,6 +267,12 @@ private Resource assignContainer(FSSchedulerNode node, node.allocateContainer(app.getApplicationId(), allocatedContainer); + // If this container is used to run AM, update the leaf queue's AM usage + if (app.getLiveContainers().size() == 1 && + !app.getUnmanagedAM()) { + queue.addAMResourceUsage(container.getResource()); + } + return container.getResource(); } else { // The desired container won't fit here, so reserve @@ -297,6 +303,14 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { app.addSchedulingOpportunity(priority); + // Check the AM resource usage for the leaf queue + if (app.getLiveContainers().size() == 0 + && !app.getUnmanagedAM()) { + if (!queue.canRunAppAM(app.getAMResource())) { + return Resources.none(); + } + } + ResourceRequest rackLocalRequest = app.getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = app.getResourceRequest(priority, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index fe738da7d46..cecfbfc8e1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue { private long lastTimeAtMinShare; private long lastTimeAtHalfFairShare; + // Track the AM resource usage for this queue + private Resource amResourceUsage; + private final ActiveUsersManager activeUsersManager; public FSLeafQueue(String name, FairScheduler scheduler, @@ -63,6 +66,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); + amResourceUsage = Resource.newInstance(0, 0); } public void addApp(FSSchedulerApp app, boolean runnable) { @@ -86,6 +90,10 @@ void addAppSchedulable(AppSchedulable appSched) { */ public boolean removeApp(FSSchedulerApp app) { if (runnableAppScheds.remove(app.getAppSchedulable())) { + // Update AM resource usage + if (app.getAMResource() != null) { + Resources.subtractFrom(amResourceUsage, app.getAMResource()); + } return true; } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { return false; @@ -284,4 +292,26 @@ public int getNumRunnableApps() { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + /** + * Check whether this queue can run this application master under the + * maxAMShare limit + * + * @param amResource + * @return true if this queue can run + */ + public boolean canRunAppAM(Resource amResource) { + float maxAMShare = + scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); + Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); + Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); + return !policy + .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); + } + + public void addAMResourceUsage(Resource amResource) { + if (amResource != null) { + Resources.addTo(amResourceUsage, amResource); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1d77a43ce75..1087c73aa19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -149,4 +149,15 @@ public abstract void computeShares( */ public abstract boolean checkIfUsageOverFairShare( Resource usage, Resource fairShare); + + /** + * Check if a leaf queue's AM resource usage over its limit under this policy + * + * @param usage {@link Resource} the resource used by application masters + * @param maxAMResource {@link Resource} the maximum allowed resource for + * application masters + * @return true if AM resource usage is over the limit + */ + public abstract boolean checkIfAMResourceUsageOverLimit( + Resource usage, Resource maxAMResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 4b663d95de8..af674b96056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -74,6 +74,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return !Resources.fitsIn(usage, fairShare); } + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return !Resources.fitsIn(usage, maxAMResource); + } + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index ca7297ff46c..5976cea5230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -124,6 +124,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); } + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return usage.getMemory() > maxAMResource.getMemory(); + } + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index d9969446811..0f4309759d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -94,6 +94,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { "as FifoPolicy only works for FSLeafQueue."); } + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return usage.getMemory() > maxAMResource.getMemory(); + } + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 5f926763d9b..fb864a2ac70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -20,14 +20,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -169,4 +176,20 @@ protected void createSchedulingRequestExistingApplication( ask.add(request); scheduler.allocate(attId, ask, new ArrayList(), null, null); } + + protected void createApplicationWithAMResource(ApplicationAttemptId attId, + String queue, String user, Resource amResource) { + RMContext rmContext = resourceManager.getRMContext(); + RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, + null, null, null, ApplicationSubmissionContext.newInstance(null, null, + null, null, null, false, false, 0, amResource, null), null, null, + 0, null, null); + rmContext.getRMApps().put(attId.getApplicationId(), rmApp); + AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( + attId.getApplicationId(), queue, user); + scheduler.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attempAddedEvent = + new AppAttemptAddedSchedulerEvent(attId, false); + scheduler.handle(attempAddedEvent); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a725d8bbf4..2a4992c32ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -174,9 +174,10 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println("alice,bob admins"); out.println(""); - // Give queue D a limit of 3 running apps + // Give queue D a limit of 3 running apps and 0.4f maxAMShare out.println(""); out.println("3"); + out.println("0.4"); out.println(""); // Give queue E a preemption timeout of one minute out.println(""); @@ -194,6 +195,8 @@ public void testAllocationFileParsing() throws Exception { out.println("15"); // Set default limit of apps per user to 5 out.println("5"); + // Set default limit of AMResourceShare to 0.5f + out.println("0.5f"); // Give user1 a limit of 10 jobs out.println(""); out.println("10"); @@ -240,6 +243,13 @@ public void testAllocationFileParsing() throws Exception { assertEquals(10, queueConf.getUserMaxApps("user1")); assertEquals(5, queueConf.getUserMaxApps("user2")); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01); + assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + // Root should get * ACL assertEquals("*", queueConf.getQueueAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b9f40b3854b..c7141b1d90a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -73,12 +72,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; @@ -510,26 +509,14 @@ public void testUserAsDefaultQueue() throws Exception { scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user1"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) .getRunnableAppSchedulables().size()); - assertEquals("root.user1", rmApp.getQueue()); + assertEquals("root.user1", resourceManager.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()).getQueue()); } @Test @@ -538,21 +525,8 @@ public void testNotUserAsDefaultQueue() throws Exception { scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user2"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user2", null); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) @@ -2329,6 +2303,121 @@ public void testUserAndQueueMaxRunningApps() throws Exception { verifyQueueNumRunnable("queue1", 2, 1); } + @Test + public void testQueueMaxAMShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.2"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + assertEquals("Queue queue1's fair share should be 10240", + 10240, scheduler.getQueueManager().getLeafQueue("queue1", true) + .getFairShare().getMemory()); + + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(2048, 2); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + // Exceeds no limits + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + + // Exceeds no limits + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + + // Exceeds queue limit + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); + FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + + // Still can run non-AM container + createSchedulingRequestExistingApplication(1024, 1, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1 should have two running containers", + 2, app1.getLiveContainers().size()); + + // Remove app1, app3's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + scheduler.handle(updateEvent); + assertEquals("Application1's AM should be finished", + 0, app1.getLiveContainers().size()); + assertEquals("Application3's AM should be running", + 1, app3.getLiveContainers().size()); + + // Exceeds queue limit + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); + FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application4's AM requests 2048 MB memory", + 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should not be running", + 0, app4.getLiveContainers().size()); + + // Remove app2 and app3, app4's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false); + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = + new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM should be finished", + 0, app2.getLiveContainers().size()); + assertEquals("Application3's AM should be finished", + 0, app3.getLiveContainers().size()); + assertEquals("Application4's AM should be running", + 1, app4.getLiveContainers().size()); + } + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 54daf2da859..23faf27bf26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -237,6 +237,11 @@ Allocation file format * maxRunningApps: limit the number of apps from the queue to run at once + * maxAMShare: limit the fraction of the queue's fair share that can be used + to run application masters. This property can only be used for leaf queues. + Default value is 1.0f, which means AMs in the leaf queue can take up to 100% + of both the memory and CPU fair share. + * weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight. @@ -279,6 +284,9 @@ Allocation file format * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue. + * <>, which sets the default AM resource + limit for queue; overriden by maxAMShare element in each queue. + * <>, which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair". @@ -328,6 +336,7 @@ Allocation file format 10000 mb,0vcores 90000 mb,0vcores 50 + 0.1 2.0 fair @@ -336,6 +345,8 @@ Allocation file format + 0.5 +