From 79aeddc88f0e71f0031d5f39fded172de0b29a2e Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Fri, 28 Oct 2016 00:29:53 +0530 Subject: [PATCH] YARN-5308. FairScheduler: Move continuous scheduling related tests to TestContinuousScheduling (Kai Sasaki via Varun Saxena) --- .../fair/TestContinuousScheduling.java | 189 +++++++++++++++++- .../scheduler/fair/TestFairScheduler.java | 157 --------------- 2 files changed, 187 insertions(+), 159 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 6188246dd31..5964d2fd12c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -22,20 +22,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + import org.junit.Before; import org.junit.Test; @@ -43,18 +55,22 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; public class TestContinuousScheduling extends FairSchedulerTestBase { private ControlledClock mockClock; + private static int delayThresholdTimeMs = 1000; @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); conf.setBoolean( FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); - conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, 100); - conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 100); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, + delayThresholdTimeMs); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, + delayThresholdTimeMs); return conf; } @@ -167,6 +183,175 @@ public void testSortedNodes() throws Exception { Assert.assertEquals(2, nodes.size()); } + @Test + public void testWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous + // scheduling once manually + scheduler = new FairScheduler(); + conf = super.createConfiguration(); + resourceManager = new MockRM(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + scheduler.init(conf); + scheduler.start(); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 + = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, scheduler.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + + @Test + public void testInterruptedException() + throws Exception { + // Disable continuous scheduling, will invoke continuous + // scheduling once manually + scheduler = new FairScheduler(); + conf = super.createConfiguration(); + resourceManager = new MockRM(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.init(conf); + scheduler.start(); + FairScheduler spyScheduler = spy(scheduler); + Assert.assertTrue("Continuous scheduling should be disabled.", + !spyScheduler.isContinuousSchedulingEnabled()); + // Add one nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + spyScheduler.handle(nodeEvent1); + Assert.assertEquals("We should have one alive node.", + 1, spyScheduler.getNumClusterNodes()); + InterruptedException ie = new InterruptedException(); + doThrow(new YarnRuntimeException(ie)).when(spyScheduler). + attemptScheduling(isA(FSSchedulerNode.class)); + // Invoke the continuous scheduling once + try { + spyScheduler.continuousSchedulingAttempt(); + fail("Expected InterruptedException to stop schedulingThread"); + } catch (InterruptedException e) { + Assert.assertEquals(ie, e); + } + } + + @Test + public void testThreadLifeCycle() throws InterruptedException { + scheduler.start(); + + Thread updateThread = scheduler.updateThread; + Thread schedulingThread = scheduler.schedulingThread; + + assertTrue(updateThread.isAlive()); + assertTrue(schedulingThread.isAlive()); + + scheduler.stop(); + + int numRetries = 100; + while (numRetries-- > 0 && + (updateThread.isAlive() || schedulingThread.isAlive())) { + Thread.sleep(50); + } + + assertNotEquals("One of the threads is still alive", 0, numRetries); + } + + @Test + public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { + scheduler.start(); + + int priorityValue; + Priority priority; + FSAppAttempt fsAppAttempt; + ResourceRequest request1; + ResourceRequest request2; + ApplicationAttemptId id11; + + priorityValue = 1; + id11 = createAppAttemptId(1, 1); + createMockRMApp(id11); + priority = Priority.newInstance(priorityValue); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", + false); + scheduler.addApplicationAttempt(id11, false, false); + fsAppAttempt = scheduler.getApplicationAttempt(id11); + + String hostName = "127.0.0.1"; + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1, + hostName); + List ask1 = new ArrayList<>(); + request1 = + createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1, + true); + request2 = + createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1, + true); + ask1.add(request1); + ask1.add(request2); + scheduler.allocate(id11, ask1, new ArrayList(), null, null, + null, null); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + FSSchedulerNode node = + (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID()); + // Tick the time and let the fsApp startTime different from initScheduler + // time + mockClock.tickSec(delayThresholdTimeMs / 1000); + scheduler.attemptScheduling(node); + Map lastScheduledContainer = + fsAppAttempt.getLastScheduledContainer(); + long initSchedulerTime = + lastScheduledContainer.get(TestUtils.toSchedulerKey(priority)); + assertEquals(delayThresholdTimeMs, initSchedulerTime); + } + private void triggerSchedulingAttempt() throws InterruptedException { Thread.sleep( 2 * scheduler.getConf().getContinuousSchedulingSleepMs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9e0dd066d7b..e28b35a3b9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -25,10 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; @@ -62,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -99,12 +95,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -4124,71 +4117,6 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { verifyQueueNumRunnable("queue1.sub3", 0, 0); } - @Test - public void testContinuousSchedulingWithNodeRemoved() throws Exception { - // Disable continuous scheduling, will invoke continuous scheduling once manually - scheduler.init(conf); - scheduler.start(); - Assert.assertTrue("Continuous scheduling should be disabled.", - !scheduler.isContinuousSchedulingEnabled()); - - // Add two nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - Assert.assertEquals("We should have two alive nodes.", - 2, scheduler.getNumClusterNodes()); - - // Remove one node - NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); - scheduler.handle(removeNode1); - Assert.assertEquals("We should only have one alive node.", - 1, scheduler.getNumClusterNodes()); - - // Invoke the continuous scheduling once - try { - scheduler.continuousSchedulingAttempt(); - } catch (Exception e) { - fail("Exception happened when doing continuous scheduling. " + - e.toString()); - } - } - - @Test - public void testContinuousSchedulingInterruptedException() - throws Exception { - scheduler.init(conf); - scheduler.start(); - FairScheduler spyScheduler = spy(scheduler); - Assert.assertTrue("Continuous scheduling should be disabled.", - !spyScheduler.isContinuousSchedulingEnabled()); - // Add one nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - spyScheduler.handle(nodeEvent1); - Assert.assertEquals("We should have one alive node.", - 1, spyScheduler.getNumClusterNodes()); - InterruptedException ie = new InterruptedException(); - doThrow(new YarnRuntimeException(ie)).when(spyScheduler). - attemptScheduling(isA(FSSchedulerNode.class)); - // Invoke the continuous scheduling once - try { - spyScheduler.continuousSchedulingAttempt(); - fail("Expected InterruptedException to stop schedulingThread"); - } catch (InterruptedException e) { - Assert.assertEquals(ie, e); - } - } - @Test public void testSchedulingOnRemovedNode() throws Exception { // Disable continuous scheduling, will invoke continuous scheduling manually @@ -4486,30 +4414,6 @@ public void testLowestCommonAncestorDeeperHierarchy() throws Exception { assertEquals(ancestorQueue, queue1); } - @Test - public void testThreadLifeCycle() throws InterruptedException { - conf.setBoolean( - FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); - scheduler.init(conf); - scheduler.start(); - - Thread updateThread = scheduler.updateThread; - Thread schedulingThread = scheduler.schedulingThread; - - assertTrue(updateThread.isAlive()); - assertTrue(schedulingThread.isAlive()); - - scheduler.stop(); - - int numRetries = 100; - while (numRetries-- > 0 && - (updateThread.isAlive() || schedulingThread.isAlive())) { - Thread.sleep(50); - } - - assertNotEquals("One of the threads is still alive", 0, numRetries); - } - @Test public void testPerfMetricsInited() { scheduler.init(conf); @@ -4644,67 +4548,6 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName() .get(attId3.getApplicationId()).getQueue()); } - @Test - public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { - int DELAY_THRESHOLD_TIME_MS = 1000; - conf.set(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, "true"); - conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, - String.valueOf(DELAY_THRESHOLD_TIME_MS)); - conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, - String.valueOf(DELAY_THRESHOLD_TIME_MS)); - - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - scheduler.init(conf); - scheduler.start(); - - int priorityValue; - Priority priority; - FSAppAttempt fsAppAttempt; - ResourceRequest request1; - ResourceRequest request2; - ApplicationAttemptId id11; - - priorityValue = 1; - id11 = createAppAttemptId(1, 1); - createMockRMApp(id11); - priority = Priority.newInstance(priorityValue); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", - false); - scheduler.addApplicationAttempt(id11, false, false); - fsAppAttempt = scheduler.getApplicationAttempt(id11); - - String hostName = "127.0.0.1"; - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1, - hostName); - List ask1 = new ArrayList<>(); - request1 = - createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1, - true); - request2 = - createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1, - true); - ask1.add(request1); - ask1.add(request2); - scheduler.allocate(id11, ask1, new ArrayList(), null, null, - null, null); - - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - FSSchedulerNode node = - (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID()); - // Tick the time and let the fsApp startTime different from initScheduler - // time - clock.tickSec(DELAY_THRESHOLD_TIME_MS / 1000); - scheduler.attemptScheduling(node); - Map lastScheduledContainer = - fsAppAttempt.getLastScheduledContainer(); - long initSchedulerTime = - lastScheduledContainer.get(TestUtils.toSchedulerKey(priority)); - assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime); - } - @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode