From e8ce7cd541e113d9b2749770a36d0e7b63485a40 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 5 Sep 2013 01:21:18 +0000 Subject: [PATCH] YARN-957. Fixed a bug in CapacityScheduler because of which requests that need more than a node's total capability were incorrectly allocated on that node causing apps to hang. Contributed by Omkar Vinit Joshi. svn merge --ignore-ancestry -c 1520187 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1520188 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../scheduler/SchedulerNode.java | 5 + .../scheduler/capacity/LeafQueue.java | 8 +- .../common/fica/FiCaSchedulerNode.java | 9 ++ .../scheduler/fair/FSSchedulerNode.java | 9 ++ .../yarn/server/resourcemanager/MockRM.java | 8 ++ .../capacity/TestContainerAllocation.java | 109 ++++++++++++++++++ 7 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 964a03cd1f5..b10e346774c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -137,6 +137,10 @@ Release 2.1.1-beta - UNRELEASED YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via vinodkv) + YARN-957. Fixed a bug in CapacityScheduler because of which requests that + need more than a node's total capability were incorrectly allocated on that + node causing apps to hang. (Omkar Vinit Joshi via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 8a80bf8cf9a..2974b9dc05a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -67,4 +67,9 @@ public abstract class SchedulerNode { */ public abstract int getNumContainers(); + /** + * Get total resources on the node. + * @return total resources on the node. + */ + public abstract Resource getTotalResource(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 41b3f5e3037..8624ec0e87d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1308,9 +1308,15 @@ public class LeafQueue implements CSQueue { + " request=" + request + " type=" + type); } Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + if (!Resources.fitsIn(capability, totalResource)) { + LOG.warn("Node : " + node.getNodeID() + + " does not have sufficient resource for request : " + request + + " node total capability : " + node.getTotalResource()); + return Resources.none(); + } assert Resources.greaterThan( resourceCalculator, clusterResource, available, Resources.none()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7a306ec4281..400b3153dcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -49,6 +49,7 @@ public class FiCaSchedulerNode extends SchedulerNode { private Resource availableResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + private Resource totalResourceCapability; private volatile int numContainers; @@ -65,6 +66,9 @@ public class FiCaSchedulerNode extends SchedulerNode { this.rmNode = node; this.availableResource.setMemory(node.getTotalCapability().getMemory()); this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); + totalResourceCapability = + Resource.newInstance(node.getTotalCapability().getMemory(), node + .getTotalCapability().getVirtualCores()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -126,6 +130,11 @@ public class FiCaSchedulerNode extends SchedulerNode { return this.usedResource; } + @Override + public Resource getTotalResource() { + return this.totalResourceCapability; + } + private synchronized boolean isValidContainer(Container c) { if (launchedContainers.containsKey(c.getId())) return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index bd29f821bb4..d84547a3ffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode { private Resource availableResource; private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + private Resource totalResourceCapability; private volatile int numContainers; @@ -68,6 +69,9 @@ public class FSSchedulerNode extends SchedulerNode { public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { this.rmNode = node; this.availableResource = Resources.clone(node.getTotalCapability()); + totalResourceCapability = + Resource.newInstance(node.getTotalCapability().getMemory(), node + .getTotalCapability().getVirtualCores()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -173,6 +177,11 @@ public class FSSchedulerNode extends SchedulerNode { Resources.subtractFrom(usedResource, resource); } + @Override + public Resource getTotalResource() { + return this.totalResourceCapability; + } + private synchronized void deductAvailableResource(Resource resource) { if (resource == null) { LOG.error("Invalid deduction of null resource for " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3e2a8906a98..0f6f8a1fd7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -232,6 +232,14 @@ public class MockRM extends ResourceManager { return nm; } + public MockNM registerNode(String nodeIdStr, int memory, int vCores) + throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); + nm.registerNode(); + return nm; + } + public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java new file mode 100644 index 00000000000..b877fbbf98f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -0,0 +1,109 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.Test; + + +public class TestContainerAllocation { + + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + + private final int GB = 1024; + + @Test(timeout = 3000000) + public void testExcessReservationThanNodeManagerCapacity() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4); + MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(2, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + LOG.info("sending container requests "); + am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + + // No container should be allocated. + // Internally it should not been reserved. + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0); + + LOG.info("heartbeating nm2"); + waitCounter = 20; + nm2.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1); + + rm.stop(); + } +} \ No newline at end of file