diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cc26923d217..66079e22c49 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -14,6 +14,9 @@ Release 2.8.0 - UNRELEASED YARN-3273. Improve scheduler UI to facilitate scheduling analysis and debugging. (Rohith Sharmaks via jianhe) + YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks + via devaraj) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index d1f0ede4a33..ca5c7a41e55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -55,7 +55,8 @@ import java.util.Map; import static java.lang.Thread.sleep; public class TestApplicationMasterService { - private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + private static final Log LOG = LogFactory + .getLog(TestApplicationMasterService.class); private final int GB = 1024; private static YarnConfiguration conf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java deleted file mode 100644 index b74faebaf5d..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ /dev/null @@ -1,629 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestFifoScheduler { - private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); - - private final int GB = 1024; - private static YarnConfiguration conf; - - @BeforeClass - public static void setup() { - conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, - FifoScheduler.class, ResourceScheduler.class); - } - - @Test (timeout = 30000) - public void testConfValidation() throws Exception { - FifoScheduler scheduler = new FifoScheduler(); - Configuration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); - try { - scheduler.serviceInit(conf); - fail("Exception is expected because the min memory allocation is" + - " larger than the max memory allocation."); - } catch (YarnRuntimeException e) { - // Exception is expected. - assertTrue("The thrown exception is not the expected one.", - e.getMessage().startsWith( - "Invalid resource scheduler memory")); - } - } - - @Test - public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() - throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - - MockRM rm = new MockRM(conf); - rm.start(); - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); - - RMApp app1 = rm.submitApp(2048); - // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1 - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - - // add request for containers - List requests = new ArrayList(); - requests.add(am1.createResourceReq("127.0.0.1", 1 * GB, 1, 1)); - requests.add(am1.createResourceReq("/default-rack", 1 * GB, 1, 1)); - am1.allocate(requests, null); // send the request - - try { - // kick the schedule - nm1.nodeHeartbeat(true); - } catch (NullPointerException e) { - Assert.fail("NPE when allocating container on node but " - + "forget to set off-switch request should be handled"); - } - rm.stop(); - } - - @Test - public void test() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - MockRM rm = new MockRM(conf); - rm.start(); - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); - MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); - - RMApp app1 = rm.submitApp(2048); - // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1 - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); - - RMApp app2 = rm.submitApp(2048); - // kick the scheduling, 2GB given to AM, remaining 2 GB on nm2 - nm2.nodeHeartbeat(true); - RMAppAttempt attempt2 = app2.getCurrentAppAttempt(); - MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId()); - am2.registerAppAttempt(); - SchedulerNodeReport report_nm2 = rm.getResourceScheduler().getNodeReport( - nm2.getNodeId()); - Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory()); - - // add request for containers - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, GB, 1, 1); - AllocateResponse alloc1Response = am1.schedule(); // send the request - // add request for containers - am2.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 0, 1); - AllocateResponse alloc2Response = am2.schedule(); // send the request - - // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0 - nm1.nodeHeartbeat(true); - while (alloc1Response.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(1000); - alloc1Response = am1.schedule(); - } - while (alloc2Response.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 2..."); - Thread.sleep(1000); - alloc2Response = am2.schedule(); - } - // kick the scheduler, nothing given remaining 2 GB. - nm2.nodeHeartbeat(true); - - List allocated1 = alloc1Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated1.size()); - Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); - Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - - List allocated2 = alloc2Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated2.size()); - Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory()); - Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId()); - - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - report_nm2 = rm.getResourceScheduler().getNodeReport(nm2.getNodeId()); - Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); - Assert.assertEquals(2 * GB, report_nm2.getAvailableResource().getMemory()); - - Assert.assertEquals(6 * GB, report_nm1.getUsedResource().getMemory()); - Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory()); - - Container c1 = allocated1.get(0); - Assert.assertEquals(GB, c1.getResource().getMemory()); - ContainerStatus containerStatus = BuilderUtils.newContainerStatus( - c1.getId(), ContainerState.COMPLETE, "", 0); - nm1.containerStatus(containerStatus); - int waitCount = 0; - while (attempt1.getJustFinishedContainers().size() < 1 - && waitCount++ != 20) { - LOG.info("Waiting for containers to be finished for app 1... Tried " - + waitCount + " times already.."); - Thread.sleep(1000); - } - Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(5 * GB, report_nm1.getUsedResource().getMemory()); - - rm.stop(); - } - - @Test - public void testNodeUpdateBeforeAppAttemptInit() throws Exception { - FifoScheduler scheduler = new FifoScheduler(); - MockRM rm = new MockRM(conf); - scheduler.setRMContext(rm.getRMContext()); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, rm.getRMContext()); - - RMNode node = MockNodes.newNodeInfo(1, - Resources.createResource(1024, 4), 1, "127.0.0.1"); - scheduler.handle(new NodeAddedSchedulerEvent(node)); - - ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1", false); - - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); - try { - scheduler.handle(updateEvent); - } catch (NullPointerException e) { - Assert.fail(); - } - - ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false, false); - - rm.stop(); - } - - private void testMinimumAllocation(YarnConfiguration conf, int testAlloc) - throws Exception { - MockRM rm = new MockRM(conf); - rm.start(); - - // Register node1 - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); - - // Submit an application - RMApp app1 = rm.submitApp(testAlloc); - - // kick the scheduling - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); - - int checkAlloc = - conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - Assert.assertEquals(checkAlloc, report_nm1.getUsedResource().getMemory()); - - rm.stop(); - } - - @Test - public void testDefaultMinimumAllocation() throws Exception { - // Test with something lesser than default - testMinimumAllocation( - new YarnConfiguration(TestFifoScheduler.conf), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB / 2); - } - - @Test - public void testNonDefaultMinimumAllocation() throws Exception { - // Set custom min-alloc to test tweaking it - int allocMB = 1536; - YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, allocMB); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - allocMB * 10); - // Test for something lesser than this. - testMinimumAllocation(conf, allocMB / 2); - } - - @Test (timeout = 50000) - public void testReconnectedNode() throws Exception { - CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - conf.setQueues("default", new String[] {"default"}); - conf.setCapacity("default", 100); - FifoScheduler fs = new FifoScheduler(); - fs.init(conf); - fs.start(); - // mock rmContext to avoid NPE. - RMContext context = mock(RMContext.class); - fs.reinitialize(conf, null); - fs.setRMContext(context); - - RMNode n1 = - MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); - RMNode n2 = - MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2, "127.0.0.3"); - - fs.handle(new NodeAddedSchedulerEvent(n1)); - fs.handle(new NodeAddedSchedulerEvent(n2)); - fs.handle(new NodeUpdateSchedulerEvent(n1)); - Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); - - // reconnect n1 with downgraded memory - n1 = - MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1, "127.0.0.2"); - fs.handle(new NodeRemovedSchedulerEvent(n1)); - fs.handle(new NodeAddedSchedulerEvent(n1)); - fs.handle(new NodeUpdateSchedulerEvent(n1)); - - Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); - fs.stop(); - } - - @Test (timeout = 50000) - public void testBlackListNodes() throws Exception { - - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - rm.start(); - FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); - - int rack_num_0 = 0; - int rack_num_1 = 1; - // Add 4 nodes in 2 racks - - // host_0_0 in rack0 - String host_0_0 = "127.0.0.1"; - RMNode n1 = - MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_0); - fs.handle(new NodeAddedSchedulerEvent(n1)); - - // host_0_1 in rack0 - String host_0_1 = "127.0.0.2"; - RMNode n2 = - MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_1); - fs.handle(new NodeAddedSchedulerEvent(n2)); - - // host_1_0 in rack1 - String host_1_0 = "127.0.0.3"; - RMNode n3 = - MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_0); - fs.handle(new NodeAddedSchedulerEvent(n3)); - - // host_1_1 in rack1 - String host_1_1 = "127.0.0.4"; - RMNode n4 = - MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_1); - fs.handle(new NodeAddedSchedulerEvent(n4)); - - // Add one application - ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( - appId1, 1); - createMockRMApp(appAttemptId1, rm.getRMContext()); - - SchedulerEvent appEvent = - new AppAddedSchedulerEvent(appId1, "queue", "user"); - fs.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId1, false); - fs.handle(attemptEvent); - - List emptyId = new ArrayList(); - List emptyAsk = new ArrayList(); - - // Allow rack-locality for rack_1, but blacklist host_1_0 - - // Set up resource requests - // Ask for a 1 GB container for app 1 - List ask1 = new ArrayList(); - ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - "rack1", BuilderUtils.newResource(GB, 1), 1)); - ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null); - - // Trigger container assignment - fs.handle(new NodeUpdateSchedulerEvent(n3)); - - // Get the allocation for the application and verify no allocation on blacklist node - Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - - Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); - - // verify host_1_1 can get allocated as not in blacklist - fs.handle(new NodeUpdateSchedulerEvent(n4)); - Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); - List containerList = allocation2.getContainers(); - for (Container container : containerList) { - Assert.assertEquals("Container is allocated on n4", - container.getNodeId(), n4.getNodeID()); - } - - // Ask for a 1 GB container again for app 1 - List ask2 = new ArrayList(); - // this time, rack0 is also in blacklist, so only host_1_1 is available to - // be assigned - ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null); - - // verify n1 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n1)); - Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); - - // verify n2 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n2)); - Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); - - // verify n3 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n3)); - Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); - - fs.handle(new NodeUpdateSchedulerEvent(n4)); - Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); - - containerList = allocation6.getContainers(); - for (Container container : containerList) { - Assert.assertEquals("Container is allocated on n4", - container.getNodeId(), n4.getNodeID()); - } - - rm.stop(); - } - - @Test (timeout = 50000) - public void testHeadroom() throws Exception { - - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - rm.start(); - FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); - - // Add a node - RMNode n1 = - MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); - fs.handle(new NodeAddedSchedulerEvent(n1)); - - // Add two applications - ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( - appId1, 1); - createMockRMApp(appAttemptId1, rm.getRMContext()); - SchedulerEvent appEvent = - new AppAddedSchedulerEvent(appId1, "queue", "user"); - fs.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId1, false); - fs.handle(attemptEvent); - - ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2); - ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( - appId2, 1); - createMockRMApp(appAttemptId2, rm.getRMContext()); - SchedulerEvent appEvent2 = - new AppAddedSchedulerEvent(appId2, "queue", "user"); - fs.handle(appEvent2); - SchedulerEvent attemptEvent2 = - new AppAttemptAddedSchedulerEvent(appAttemptId2, false); - fs.handle(attemptEvent2); - - List emptyId = new ArrayList(); - List emptyAsk = new ArrayList(); - - // Set up resource requests - - // Ask for a 1 GB container for app 1 - List ask1 = new ArrayList(); - ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, null, null); - - // Ask for a 2 GB container for app 2 - List ask2 = new ArrayList(); - ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); - fs.allocate(appAttemptId2, ask2, emptyId, null, null); - - // Trigger container assignment - fs.handle(new NodeUpdateSchedulerEvent(n1)); - - // Get the allocation for the applications and verify headroom - Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); - Assert.assertEquals("Allocation headroom", 1 * GB, - allocation1.getResourceLimit().getMemory()); - - Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null); - Assert.assertEquals("Allocation headroom", 1 * GB, - allocation2.getResourceLimit().getMemory()); - - rm.stop(); - } - - @Test - public void testResourceOverCommit() throws Exception { - MockRM rm = new MockRM(conf); - rm.start(); - - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); - - RMApp app1 = rm.submitApp(2048); - // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); - // check node report, 2 GB used and 2 GB available - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); - Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); - - // add request for containers - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); - AllocateResponse alloc1Response = am1.schedule(); // send the request - - // kick the scheduler, 2 GB given to AM1, resource remaining 0 - nm1.nodeHeartbeat(true); - while (alloc1Response.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(1000); - alloc1Response = am1.schedule(); - } - - List allocated1 = alloc1Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated1.size()); - Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); - Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - // check node report, 4 GB used and 0 GB available - Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); - - // check container is assigned with 2 GB. - Container c1 = allocated1.get(0); - Assert.assertEquals(2 * GB, c1.getResource().getMemory()); - - // update node resource to 2 GB, so resource is over-consumed. - Map nodeResourceMap = - new HashMap(); - nodeResourceMap.put(nm1.getNodeId(), - ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); - UpdateNodeResourceRequest request = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); - AdminService as = rm.adminService; - as.updateNodeResource(request); - - // Now, the used resource is still 4 GB, and available resource is minus value. - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); - Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory()); - - // Check container can complete successfully in case of resource over-commitment. - ContainerStatus containerStatus = BuilderUtils.newContainerStatus( - c1.getId(), ContainerState.COMPLETE, "", 0); - nm1.containerStatus(containerStatus); - int waitCount = 0; - while (attempt1.getJustFinishedContainers().size() < 1 - && waitCount++ != 20) { - LOG.info("Waiting for containers to be finished for app 1... Tried " - + waitCount + " times already.."); - Thread.sleep(100); - } - Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); - // As container return 2 GB back, the available resource becomes 0 again. - Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); - rm.stop(); - } - - public static void main(String[] args) throws Exception { - TestFifoScheduler t = new TestFifoScheduler(); - t.test(); - t.testDefaultMinimumAllocation(); - t.testNonDefaultMinimumAllocation(); - t.testReconnectedNode(); - } - - private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId, - RMContext context) { - RMAppImpl app = mock(RMAppImpl.class); - when(app.getApplicationId()).thenReturn(attemptId.getApplicationId()); - RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); - when(attempt.getAppAttemptId()).thenReturn(attemptId); - RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); - when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); - when(app.getCurrentAppAttempt()).thenReturn(attempt); - context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app); - return app; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0ad295774b5..fe61eabc973 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -68,7 +67,8 @@ import com.google.common.collect.Sets; public class TestContainerAllocation { - private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + private static final Log LOG = LogFactory + .getLog(TestContainerAllocation.class); private final int GB = 1024; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index b662ef15b26..5fd6c924249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -27,6 +28,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,9 +36,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -47,9 +53,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -58,21 +68,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -80,6 +96,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -90,6 +109,7 @@ public class TestFifoScheduler { private final int GB = 1024; private ResourceManager resourceManager = null; + private static Configuration conf; private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -97,7 +117,7 @@ public class TestFifoScheduler { @Before public void setUp() throws Exception { resourceManager = new ResourceManager(); - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); resourceManager.init(conf); @@ -566,46 +586,6 @@ public class TestFifoScheduler { LOG.info("--- END: testFifoScheduler ---"); } - @Test - public void testBlackListNodes() throws Exception { - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - rm.start(); - FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); - - String host = "127.0.0.1"; - RMNode node = - MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); - fs.handle(new NodeAddedSchedulerEvent(node)); - - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - - createMockRMApp(appAttemptId, rm.getRMContext()); - - SchedulerEvent appEvent = - new AppAddedSchedulerEvent(appId, "default", - "user"); - fs.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - fs.handle(attemptEvent); - - // Verify the blacklist can be updated independent of requesting containers - fs.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(host), null); - Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); - fs.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), null, - Collections.singletonList(host)); - Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); - rm.stop(); - } - @Test public void testGetAppsInQueue() throws Exception { Application application_0 = new Application("user_0", resourceManager); @@ -638,6 +618,542 @@ public class TestFifoScheduler { fs.getSchedulerApplications(), fs, "queue"); } + @Test(timeout = 30000) + public void testConfValidation() throws Exception { + FifoScheduler scheduler = new FifoScheduler(); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); + try { + scheduler.serviceInit(conf); + fail("Exception is expected because the min memory allocation is" + + " larger than the max memory allocation."); + } catch (YarnRuntimeException e) { + // Exception is expected. + assertTrue("The thrown exception is not the expected one.", e + .getMessage().startsWith("Invalid resource scheduler memory")); + } + } + + @Test(timeout = 60000) + public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() + throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + RMApp app1 = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1 + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // add request for containers + List requests = new ArrayList(); + requests.add(am1.createResourceReq("127.0.0.1", 1 * GB, 1, 1)); + requests.add(am1.createResourceReq("/default-rack", 1 * GB, 1, 1)); + am1.allocate(requests, null); // send the request + + try { + // kick the schedule + nm1.nodeHeartbeat(true); + } catch (NullPointerException e) { + Assert.fail("NPE when allocating container on node but " + + "forget to set off-switch request should be handled"); + } + rm.stop(); + } + + @Test(timeout = 60000) + public void testFifoScheduling() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); + + RMApp app1 = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1 + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + + RMApp app2 = rm.submitApp(2048); + // kick the scheduling, 2GB given to AM, remaining 2 GB on nm2 + nm2.nodeHeartbeat(true); + RMAppAttempt attempt2 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + SchedulerNodeReport report_nm2 = + rm.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory()); + + // add request for containers + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + // add request for containers + am2.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 0, 1); + AllocateResponse alloc2Response = am2.schedule(); // send the request + + // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(1000); + alloc1Response = am1.schedule(); + } + while (alloc2Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 2..."); + Thread.sleep(1000); + alloc2Response = am2.schedule(); + } + // kick the scheduler, nothing given remaining 2 GB. + nm2.nodeHeartbeat(true); + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(1, allocated1.size()); + Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); + Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); + + List allocated2 = alloc2Response.getAllocatedContainers(); + Assert.assertEquals(1, allocated2.size()); + Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory()); + Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId()); + + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + report_nm2 = rm.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm2.getAvailableResource().getMemory()); + + Assert.assertEquals(6 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory()); + + Container c1 = allocated1.get(0); + Assert.assertEquals(GB, c1.getResource().getMemory()); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(c1.getId(), ContainerState.COMPLETE, + "", 0); + nm1.containerStatus(containerStatus); + int waitCount = 0; + while (attempt1.getJustFinishedContainers().size() < 1 && waitCount++ != 20) { + LOG.info("Waiting for containers to be finished for app 1... Tried " + + waitCount + " times already.."); + Thread.sleep(1000); + } + Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); + Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses() + .size()); + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(5 * GB, report_nm1.getUsedResource().getMemory()); + + rm.stop(); + } + + @Test(timeout = 60000) + public void testNodeUpdateBeforeAppAttemptInit() throws Exception { + FifoScheduler scheduler = new FifoScheduler(); + MockRM rm = new MockRM(conf); + scheduler.setRMContext(rm.getRMContext()); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rm.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(1024, 4), 1, + "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + ApplicationId appId = ApplicationId.newInstance(0, 1); + scheduler.addApplication(appId, "queue1", "user1", false); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + try { + scheduler.handle(updateEvent); + } catch (NullPointerException e) { + Assert.fail(); + } + + ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); + scheduler.addApplicationAttempt(attId, false, false); + + rm.stop(); + } + + private void testMinimumAllocation(YarnConfiguration conf, int testAlloc) + throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(testAlloc); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + + int checkAlloc = + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + Assert.assertEquals(checkAlloc, report_nm1.getUsedResource().getMemory()); + + rm.stop(); + } + + @Test(timeout = 60000) + public void testDefaultMinimumAllocation() throws Exception { + // Test with something lesser than default + testMinimumAllocation(new YarnConfiguration(TestFifoScheduler.conf), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB / 2); + } + + @Test(timeout = 60000) + public void testNonDefaultMinimumAllocation() throws Exception { + // Set custom min-alloc to test tweaking it + int allocMB = 1536; + YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, allocMB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + allocMB * 10); + // Test for something lesser than this. + testMinimumAllocation(conf, allocMB / 2); + } + + @Test(timeout = 50000) + public void testReconnectedNode() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setQueues("default", new String[] { "default" }); + conf.setCapacity("default", 100); + FifoScheduler fs = new FifoScheduler(); + fs.init(conf); + fs.start(); + // mock rmContext to avoid NPE. + RMContext context = mock(RMContext.class); + fs.reinitialize(conf, null); + fs.setRMContext(context); + + RMNode n1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); + RMNode n2 = + MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2, "127.0.0.3"); + + fs.handle(new NodeAddedSchedulerEvent(n1)); + fs.handle(new NodeAddedSchedulerEvent(n2)); + fs.handle(new NodeUpdateSchedulerEvent(n1)); + Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); + + // reconnect n1 with downgraded memory + n1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1, "127.0.0.2"); + fs.handle(new NodeRemovedSchedulerEvent(n1)); + fs.handle(new NodeAddedSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1)); + + Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); + fs.stop(); + } + + @Test(timeout = 50000) + public void testBlackListNodes() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + + int rack_num_0 = 0; + int rack_num_1 = 1; + // Add 4 nodes in 2 racks + + // host_0_0 in rack0 + String host_0_0 = "127.0.0.1"; + RMNode n1 = + MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, + host_0_0); + fs.handle(new NodeAddedSchedulerEvent(n1)); + + // host_0_1 in rack0 + String host_0_1 = "127.0.0.2"; + RMNode n2 = + MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, + host_0_1); + fs.handle(new NodeAddedSchedulerEvent(n2)); + + // host_1_0 in rack1 + String host_1_0 = "127.0.0.3"; + RMNode n3 = + MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, + host_1_0); + fs.handle(new NodeAddedSchedulerEvent(n3)); + + // host_1_1 in rack1 + String host_1_1 = "127.0.0.4"; + RMNode n4 = + MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, + host_1_1); + fs.handle(new NodeAddedSchedulerEvent(n4)); + + // Add one application + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(appId1, 1); + createMockRMApp(appAttemptId1, rm.getRMContext()); + + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId1, "queue", "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + fs.handle(attemptEvent); + + List emptyId = new ArrayList(); + List emptyAsk = new ArrayList(); + + // Allow rack-locality for rack_1, but blacklist host_1_0 + + // Set up resource requests + // Ask for a 1 GB container for app 1 + List ask1 = new ArrayList(); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + "rack1", BuilderUtils.newResource(GB, 1), 1)); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); + fs.allocate(appAttemptId1, ask1, emptyId, + Collections.singletonList(host_1_0), null); + + // Trigger container assignment + fs.handle(new NodeUpdateSchedulerEvent(n3)); + + // Get the allocation for the application and verify no allocation on + // blacklist node + Allocation allocation1 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + + Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); + + // verify host_1_1 can get allocated as not in blacklist + fs.handle(new NodeUpdateSchedulerEvent(n4)); + Allocation allocation2 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); + List containerList = allocation2.getContainers(); + for (Container container : containerList) { + Assert.assertEquals("Container is allocated on n4", + container.getNodeId(), n4.getNodeID()); + } + + // Ask for a 1 GB container again for app 1 + List ask2 = new ArrayList(); + // this time, rack0 is also in blacklist, so only host_1_1 is available to + // be assigned + ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); + fs.allocate(appAttemptId1, ask2, emptyId, + Collections.singletonList("rack0"), null); + + // verify n1 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n1)); + Allocation allocation3 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); + + // verify n2 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n2)); + Allocation allocation4 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); + + // verify n3 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n3)); + Allocation allocation5 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); + + fs.handle(new NodeUpdateSchedulerEvent(n4)); + Allocation allocation6 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); + + containerList = allocation6.getContainers(); + for (Container container : containerList) { + Assert.assertEquals("Container is allocated on n4", + container.getNodeId(), n4.getNodeID()); + } + + rm.stop(); + } + + @Test(timeout = 50000) + public void testHeadroom() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + + // Add a node + RMNode n1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); + fs.handle(new NodeAddedSchedulerEvent(n1)); + + // Add two applications + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(appId1, 1); + createMockRMApp(appAttemptId1, rm.getRMContext()); + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId1, "queue", "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + fs.handle(attemptEvent); + + ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(appId2, 1); + createMockRMApp(appAttemptId2, rm.getRMContext()); + SchedulerEvent appEvent2 = + new AppAddedSchedulerEvent(appId2, "queue", "user"); + fs.handle(appEvent2); + SchedulerEvent attemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + fs.handle(attemptEvent2); + + List emptyId = new ArrayList(); + List emptyAsk = new ArrayList(); + + // Set up resource requests + + // Ask for a 1 GB container for app 1 + List ask1 = new ArrayList(); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); + fs.allocate(appAttemptId1, ask1, emptyId, null, null); + + // Ask for a 2 GB container for app 2 + List ask2 = new ArrayList(); + ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); + fs.allocate(appAttemptId2, ask2, emptyId, null, null); + + // Trigger container assignment + fs.handle(new NodeUpdateSchedulerEvent(n1)); + + // Get the allocation for the applications and verify headroom + Allocation allocation1 = + fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("Allocation headroom", 1 * GB, allocation1 + .getResourceLimit().getMemory()); + + Allocation allocation2 = + fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null); + Assert.assertEquals("Allocation headroom", 1 * GB, allocation2 + .getResourceLimit().getMemory()); + + rm.stop(); + } + + @Test(timeout = 60000) + public void testResourceOverCommit() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + + RMApp app1 = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + // check node report, 2 GB used and 2 GB available + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler, 2 GB given to AM1, resource remaining 0 + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(1000); + alloc1Response = am1.schedule(); + } + + List allocated1 = alloc1Response.getAllocatedContainers(); + Assert.assertEquals(1, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); + + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + // check node report, 4 GB used and 0 GB available + Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + + // check container is assigned with 2 GB. + Container c1 = allocated1.get(0); + Assert.assertEquals(2 * GB, c1.getResource().getMemory()); + + // update node resource to 2 GB, so resource is over-consumed. + Map nodeResourceMap = + new HashMap(); + nodeResourceMap.put(nm1.getNodeId(), + ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); + UpdateNodeResourceRequest request = + UpdateNodeResourceRequest.newInstance(nodeResourceMap); + rm.getAdminService().updateNodeResource(request); + + // Now, the used resource is still 4 GB, and available resource is minus + // value. + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory()); + + // Check container can complete successfully in case of resource + // over-commitment. + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(c1.getId(), ContainerState.COMPLETE, + "", 0); + nm1.containerStatus(containerStatus); + int waitCount = 0; + while (attempt1.getJustFinishedContainers().size() < 1 && waitCount++ != 20) { + LOG.info("Waiting for containers to be finished for app 1... Tried " + + waitCount + " times already.."); + Thread.sleep(100); + } + Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); + Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses() + .size()); + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory()); + // As container return 2 GB back, the available resource becomes 0 again. + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + rm.stop(); + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());