diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 3caeb3c3ad1..0e1d90424e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.junit.Assert; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -50,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; public class FairSchedulerTestBase { public final static String TEST_DIR = @@ -66,6 +67,8 @@ public class FairSchedulerTestBase { protected FairScheduler scheduler; protected ResourceManager resourceManager; public static final float TEST_RESERVATION_THRESHOLD = 0.09f; + private static final int SLEEP_DURATION = 10; + private static final int SLEEP_RETRIES = 1000; // Helper methods public Configuration createConfiguration() { @@ -260,4 +263,21 @@ public class FairSchedulerTestBase { .put(attemptId.getApplicationId(), app); return app; } + + protected void checkAppConsumption(FSAppAttempt app, Resource resource) + throws InterruptedException { + for (int i = 0; i < SLEEP_RETRIES; i++) { + if (Resources.equals(resource, app.getCurrentConsumption())) { + break; + } else { + Thread.sleep(SLEEP_DURATION); + } + } + + // available resource + Assert.assertEquals(resource.getMemory(), + app.getCurrentConsumption().getMemory()); + Assert.assertEquals(resource.getVirtualCores(), + app.getCurrentConsumption().getVirtualCores()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 65c80a6dde7..2e7b3f85e6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -31,13 +33,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; public class TestContinuousScheduling extends FairSchedulerTestBase { private ControlledClock mockClock; @@ -78,7 +84,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { } @Test (timeout = 60000) - public void testSchedulingDelay() throws InterruptedException { + public void testBasic() throws InterruptedException { // Add one node String host = "127.0.0.1"; RMNode node1 = MockNodes.newNodeInfo( @@ -88,8 +94,6 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdateEvent); - // Create one application and submit one each of node-local, rack-local - // and ANY requests ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); createMockRMApp(appAttemptId); @@ -102,11 +106,69 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { appAttemptId, ask, new ArrayList(), null, null, null, null); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - // Advance time and let continuous scheduling kick in - mockClock.tickSec(1); - while (1024 != app.getCurrentConsumption().getMemory()) { - Thread.sleep(100); + triggerSchedulingAttempt(); + checkAppConsumption(app, Resources.createResource(1024, 1)); + } + + @Test (timeout = 10000) + public void testSortedNodes() throws Exception { + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + // available resource + Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024); + Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16); + + // send application request + ApplicationAttemptId appAttemptId = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + createMockRMApp(appAttemptId); + + scheduler.addApplication(appAttemptId.getApplicationId(), + "queue11", "user11", false); + scheduler.addApplicationAttempt(appAttemptId, false, false); + List ask = new ArrayList<>(); + ResourceRequest request = + createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); + ask.add(request); + scheduler.allocate(appAttemptId, ask, + new ArrayList(), null, null, null, null); + triggerSchedulingAttempt(); + + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + checkAppConsumption(app, Resources.createResource(1024, 1)); + + // another request + request = + createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); + ask.clear(); + ask.add(request); + scheduler.allocate(appAttemptId, ask, + new ArrayList(), null, null, null, null); + triggerSchedulingAttempt(); + + checkAppConsumption(app, Resources.createResource(2048,2)); + + // 2 containers should be assigned to 2 nodes + Set nodes = new HashSet(); + Iterator it = app.getLiveContainers().iterator(); + while (it.hasNext()) { + nodes.add(it.next().getContainer().getNodeId()); } - assertEquals(1024, app.getCurrentConsumption().getMemory()); + Assert.assertEquals(2, nodes.size()); + } + + private void triggerSchedulingAttempt() { + mockClock.tickMsec( + 2 * scheduler.getConf().getContinuousSchedulingSleepMs()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1add1937690..a75b5ceb72c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3738,81 +3738,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { verifyQueueNumRunnable("queue1.sub3", 0, 0); } - @Test (timeout = 10000) - public void testContinuousScheduling() throws Exception { - // set continuous scheduling enabled - scheduler = new FairScheduler(); - Configuration conf = createConfiguration(); - conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); - scheduler.setRMContext(resourceManager.getRMContext()); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - Assert.assertTrue("Continuous scheduling should be enabled.", - scheduler.isContinuousSchedulingEnabled()); - - // Add two nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - // available resource - Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024); - Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16); - - // send application request - ApplicationAttemptId appAttemptId = - createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - createMockRMApp(appAttemptId); - - scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); - scheduler.addApplicationAttempt(appAttemptId, false, false); - List ask = new ArrayList(); - ResourceRequest request = - createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); - ask.add(request); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null, null, null); - - // waiting for continuous_scheduler_sleep_time - // at least one pass - Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500); - - FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - // Wait until app gets resources. - while (app.getCurrentConsumption().equals(Resources.none())) { } - - // check consumption - Assert.assertEquals(1024, app.getCurrentConsumption().getMemory()); - Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores()); - - // another request - request = - createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); - ask.clear(); - ask.add(request); - scheduler.stop(); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null, null, null); - scheduler.continuousSchedulingAttempt(); - Assert.assertEquals(2048, app.getCurrentConsumption().getMemory()); - Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores()); - - // 2 containers should be assigned to 2 nodes - Set nodes = new HashSet(); - Iterator it = app.getLiveContainers().iterator(); - while (it.hasNext()) { - nodes.add(it.next().getContainer().getNodeId()); - } - Assert.assertEquals(2, nodes.size()); - } - @Test public void testContinuousSchedulingWithNodeRemoved() throws Exception { // Disable continuous scheduling, will invoke continuous scheduling once manually