From 179cab81e0bde1af0cba6131ccccf16ff127358a Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 30 Oct 2014 00:29:07 -0700 Subject: [PATCH] YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with queue and headroom checks. (Tsuyoshi Ozawa via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FairScheduler.java | 1 + .../TestWorkPreservingRMRestart.java | 144 +++++++++++++----- 3 files changed, 114 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2369d503da3..f4e4afa9f86 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -56,6 +56,9 @@ Release 2.7.0 - UNRELEASED YARN-2742. FairSchedulerConfiguration should allow extra spaces between value and unit. (Wei Yan via kasha) + YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with + queue and headroom checks. (Tsuyoshi Ozawa via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index d6339813487..3fc3019a8b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -305,6 +305,7 @@ public class FairScheduler extends // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); + updateRootQueueMetrics(); if (LOG.isDebugEnabled()) { if (--updatesToSkipForDebug < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 80be22ba194..85d38950eac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -47,6 +49,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -65,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; @@ -148,6 +154,9 @@ public class TestWorkPreservingRMRestart { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); rm1 = new MockRM(conf, memStore); + if (schedulerClass.equals(FairScheduler.class)) { + initFairScheduler(rm1); + } rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); @@ -160,6 +169,9 @@ public class TestWorkPreservingRMRestart { // Re-start RM rm2 = new MockRM(conf, memStore); + if (schedulerClass.equals(FairScheduler.class)) { + initFairScheduler(rm2); + } rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app @@ -227,7 +239,9 @@ public class TestWorkPreservingRMRestart { if (schedulerClass.equals(CapacityScheduler.class)) { checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); } else if (schedulerClass.equals(FifoScheduler.class)) { - checkFifoQueue(schedulerApp, usedResources, availableResources); + checkFifoQueue(rm2, schedulerApp, usedResources, availableResources); + } else if (schedulerClass.equals(FairScheduler.class)) { + checkFSQueue(rm2, schedulerApp, usedResources, availableResources); } // *********** check scheduler attempt state.******** @@ -239,11 +253,6 @@ public class TestWorkPreservingRMRestart { scheduler.getRMContainer(runningContainer.getContainerId()))); assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); - // Until YARN-1959 is resolved - if (scheduler.getClass() != FairScheduler.class) { - assertEquals(availableResources, schedulerAttempt.getHeadroom()); - } - // *********** check appSchedulingInfo state *********** assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); } @@ -253,23 +262,28 @@ public class TestWorkPreservingRMRestart { Resource clusterResource, Resource queueResource, Resource usedResource, int numContainers) throws Exception { - checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource, - numContainers); + checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource, + numContainers); LeafQueue queue = (LeafQueue) app.getQueue(); - Resource availableResources = Resources.subtract(queueResource, usedResource); + Resource availableResources = + Resources.subtract(queueResource, usedResource); + // ************ check app headroom **************** + SchedulerApplicationAttempt schedulerAttempt = app.getCurrentAppAttempt(); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + // ************* check Queue metrics ************ QueueMetrics queueMetrics = queue.getMetrics(); - asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), - availableResources.getVirtualCores(), usedResource.getMemory(), - usedResource.getVirtualCores()); + assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); // ************ check user metrics *********** QueueMetrics userMetrics = queueMetrics.getUserMetrics(app.getUser()); - asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), - availableResources.getVirtualCores(), usedResource.getMemory(), - usedResource.getVirtualCores()); + assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); } private void checkCSLeafQueue(MockRM rm, @@ -297,9 +311,10 @@ public class TestWorkPreservingRMRestart { .getTotalConsumedResources()); } - private void checkFifoQueue(SchedulerApplication schedulerApp, - Resource usedResources, Resource availableResources) throws Exception { - FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler(); + private void checkFifoQueue(ResourceManager rm, + SchedulerApplication schedulerApp, Resource usedResources, + Resource availableResources) throws Exception { + FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler(); // ************ check cluster used Resources ******** assertEquals(usedResources, scheduler.getUsedResource()); @@ -310,9 +325,68 @@ public class TestWorkPreservingRMRestart { // ************ check queue metrics **************** QueueMetrics queueMetrics = scheduler.getRootQueueMetrics(); - asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), - availableResources.getVirtualCores(), usedResources.getMemory(), - usedResources.getVirtualCores()); + assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + private void checkFSQueue(ResourceManager rm, + SchedulerApplication schedulerApp, Resource usedResources, + Resource availableResources) throws Exception { + // waiting for RM's scheduling apps + int retry = 0; + Resource assumedFairShare = Resource.newInstance(8192, 8); + while (true) { + Thread.sleep(100); + if (assumedFairShare.equals(((FairScheduler)rm.getResourceScheduler()) + .getQueueManager().getRootQueue().getFairShare())) { + break; + } + retry++; + if (retry > 30) { + Assert.fail("Apps are not scheduled within assumed timeout"); + } + } + + FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler(); + FSParentQueue root = scheduler.getQueueManager().getRootQueue(); + // ************ check cluster used Resources ******** + assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy); + assertEquals(usedResources,root.getResourceUsage()); + + // ************ check app headroom **************** + FSAppAttempt schedulerAttempt = + (FSAppAttempt) schedulerApp.getCurrentAppAttempt(); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // ************ check queue metrics **************** + QueueMetrics queueMetrics = scheduler.getRootQueueMetrics(); + assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + private void initFairScheduler(ResourceManager rm) throws IOException { + FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler(); + String testDir = + new File( + System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); + String allocFile = new File(testDir, "test-queues").getAbsolutePath(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile); + + PrintWriter out = new PrintWriter(new FileWriter(allocFile)); + out.println(""); + out.println(""); + out.println("fair"); + out.println(""); + out.println(" drf"); + out.println(" 1.0"); + out.println(" 100"); + out.println(" 120"); + out.println(" .5"); + out.println(""); + out.println(""); + out.close(); } // create 3 container reports for AM @@ -462,9 +536,10 @@ public class TestWorkPreservingRMRestart { checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource, q1UsedResource, 4); QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics(); - asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4, - q1availableResources.getMemory(), q1availableResources.getVirtualCores(), - q1UsedResource.getMemory(), q1UsedResource.getVirtualCores()); + assertMetrics(queue1Metrics, 2, 0, 2, 0, 4, + q1availableResources.getMemory(), + q1availableResources.getVirtualCores(), q1UsedResource.getMemory(), + q1UsedResource.getVirtualCores()); // assert queue B state. SchedulerApplication schedulerApp2 = @@ -472,19 +547,20 @@ public class TestWorkPreservingRMRestart { checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource, q2UsedResource, 2); QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics(); - asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2, - q2availableResources.getMemory(), q2availableResources.getVirtualCores(), - q2UsedResource.getMemory(), q2UsedResource.getVirtualCores()); + assertMetrics(queue2Metrics, 1, 0, 1, 0, 2, + q2availableResources.getMemory(), + q2availableResources.getVirtualCores(), q2UsedResource.getMemory(), + q2UsedResource.getVirtualCores()); // assert parent queue state. LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue(); ParentQueue parentQueue = (ParentQueue) leafQueue.getParent(); checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16, (float) 6 / 16); - asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6, - totalAvailableResource.getMemory(), - totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), - totalUsedResource.getVirtualCores()); + assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6, + totalAvailableResource.getMemory(), + totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), + totalUsedResource.getVirtualCores()); } //Test that we receive a meaningful exit-causing exception if a queue @@ -818,7 +894,7 @@ public class TestWorkPreservingRMRestart { }, 1000, 20000); } - private void asserteMetrics(QueueMetrics qm, int appsSubmitted, + private void assertMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, int allocatedMB, int allocatedVirtualCores) {