From ac7d455a0fab3e49f2b03be5882e026f2d746188 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Fri, 12 Apr 2013 12:15:17 +0000 Subject: [PATCH] Merge -c 1467244 from trunk to branch-2 to fix YARN-412. Fixed FifoScheduler to check hostname of a NodeManager rather than its host:port during scheduling which caused incorrect locality for containers. Contributed by Roger Hoover. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1467245 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../scheduler/fifo/FifoScheduler.java | 2 +- .../server/resourcemanager/MockNodes.java | 3 +- .../scheduler/fifo/TestFifoScheduler.java | 94 ++++++++++++++++++- 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3d33a48aea2..efbaece4d08 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -188,6 +188,10 @@ Release 2.0.4-alpha - UNRELEASED YARN-470. Support a way to disable resource monitoring on the NodeManager. (Siddharth Seth via hitesh) + YARN-412. Fixed FifoScheduler to check hostname of a NodeManager rather + than its host:port during scheduling which caused incorrect locality for + containers. (Roger Hoover via acmurthy) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 2dbb498f9ba..d5a542700f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -462,7 +462,7 @@ private int assignNodeLocalContainers(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = - application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); + application.getResourceRequest(priority, node.getHostName()); if (request != null) { // Don't allocate on this node if we don't need containers on this rack ResourceRequest rackRequest = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index c3fe72d99cb..ae6d5814a46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -209,6 +209,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, NodeState st final String rackName = "rack"+ rack; final int nid = hostnum; final String hostName = "host"+ nid; + final String nodeAddr = hostName + ":" + nid; final int port = 123; final NodeId nodeID = newNodeID(hostName, port); final String httpAddress = httpAddr; @@ -218,7 +219,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, NodeState st nodeHealthStatus.setIsNodeHealthy(true); nodeHealthStatus.setHealthReport("HealthyMe"); } - return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName, + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, nodeHealthStatus, nid, hostName, state); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 8f45f4535cb..85076baaef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import junit.framework.Assert; @@ -28,6 +30,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -35,15 +38,22 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; @@ -55,6 +65,9 @@ public class TestFifoScheduler { private ResourceManager resourceManager = null; + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + @Before public void setUp() throws Exception { resourceManager = new ResourceManager(); @@ -78,14 +91,38 @@ public void tearDown() throws Exception { .getRMContext()); } - @Test + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationAttemptId attId = recordFactory + .newRecordInstance(ApplicationAttemptId.class); + ApplicationId appIdImpl = recordFactory + .newRecordInstance(ApplicationId.class); + appIdImpl.setId(appId); + attId.setAttemptId(attemptId); + attId.setApplicationId(appIdImpl); + return attId; + } + + private ResourceRequest createResourceRequest(int memory, String host, + int priority, int numContainers) { + ResourceRequest request = recordFactory + .newRecordInstance(ResourceRequest.class); + request.setCapability(Resources.createResource(memory)); + request.setHostName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + return request; + } + + @Test(timeout=5000) public void testFifoSchedulerCapacityWhenNoNMs() { FifoScheduler scheduler = new FifoScheduler(); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); } - @Test + @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); RMContext rmContext = new RMContextImpl(dispatcher, null, @@ -111,6 +148,59 @@ public void testAppAttemptMetrics() throws Exception { Assert.assertEquals(1, metrics.getAppsSubmitted()); } + @Test(timeout=2000) + public void testNodeLocalAssignment() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, null, null); + + FifoScheduler scheduler = new FifoScheduler(); + scheduler.reinitialize(new Configuration(), rmContext); + + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(1024 * 64), 1234); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId, + "queue1", "user1"); + scheduler.handle(appEvent1); + + int memory = 64; + int nConts = 3; + int priority = 20; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, nConts); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, nConts); + ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, + nConts); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList()); + + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + + // Before the node update event, there are 3 local requests outstanding + Assert.assertEquals(3, nodeLocal.getNumContainers()); + + scheduler.handle(node0Update); + + // After the node update event, check that there are no more local requests + // outstanding + Assert.assertEquals(0, nodeLocal.getNumContainers()); + //Also check that the containers were scheduled + SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); + Assert.assertEquals(3, info.getLiveContainers().size()); + } + // @Test public void testFifoScheduler() throws Exception {