YARN-412. Fixed FifoScheduler to check hostname of a NodeManager rather than its host:port during scheduling which caused incorrect locality for containers. Contributed by Roger Hoover.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1467244 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-04-12 12:12:08 +00:00
parent 41c4cd08a0
commit c2592021f3
4 changed files with 99 additions and 4 deletions

View File

@ -249,6 +249,10 @@ Release 2.0.4-alpha - UNRELEASED
YARN-470. Support a way to disable resource monitoring on the NodeManager. YARN-470. Support a way to disable resource monitoring on the NodeManager.
(Siddharth Seth via hitesh) (Siddharth Seth via hitesh)
YARN-412. Fixed FifoScheduler to check hostname of a NodeManager rather
than its host:port during scheduling which caused incorrect locality for
containers. (Roger Hoover via acmurthy)
Release 2.0.3-alpha - 2013-02-06 Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -462,7 +462,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
FiCaSchedulerApp application, Priority priority) { FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0; int assignedContainers = 0;
ResourceRequest request = ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); application.getResourceRequest(priority, node.getHostName());
if (request != null) { if (request != null) {
// Don't allocate on this node if we don't need containers on this rack // Don't allocate on this node if we don't need containers on this rack
ResourceRequest rackRequest = ResourceRequest rackRequest =

View File

@ -209,6 +209,7 @@ public class MockNodes {
final String rackName = "rack"+ rack; final String rackName = "rack"+ rack;
final int nid = hostnum; final int nid = hostnum;
final String hostName = "host"+ nid; final String hostName = "host"+ nid;
final String nodeAddr = hostName + ":" + nid;
final int port = 123; final int port = 123;
final NodeId nodeID = newNodeID(hostName, port); final NodeId nodeID = newNodeID(hostName, port);
final String httpAddress = httpAddr; final String httpAddress = httpAddr;
@ -218,7 +219,7 @@ public class MockNodes {
nodeHealthStatus.setIsNodeHealthy(true); nodeHealthStatus.setIsNodeHealthy(true);
nodeHealthStatus.setHealthReport("HealthyMe"); nodeHealthStatus.setHealthReport("HealthyMe");
} }
return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName, return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName,
nodeHealthStatus, nid, hostName, state); nodeHealthStatus, nid, hostName, state);
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -35,15 +38,22 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After; import org.junit.After;
@ -55,6 +65,9 @@ public class TestFifoScheduler {
private ResourceManager resourceManager = null; private ResourceManager resourceManager = null;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
resourceManager = new ResourceManager(); resourceManager = new ResourceManager();
@ -78,14 +91,38 @@ public class TestFifoScheduler {
.getRMContext()); .getRMContext());
} }
@Test private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory
.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;
}
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers) {
ResourceRequest request = recordFactory
.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setHostName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
return request;
}
@Test(timeout=5000)
public void testFifoSchedulerCapacityWhenNoNMs() { public void testFifoSchedulerCapacityWhenNoNMs() {
FifoScheduler scheduler = new FifoScheduler(); FifoScheduler scheduler = new FifoScheduler();
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
} }
@Test @Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception { public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher(); AsyncDispatcher dispatcher = new InlineDispatcher();
RMContext rmContext = new RMContextImpl(dispatcher, null, RMContext rmContext = new RMContextImpl(dispatcher, null,
@ -111,6 +148,59 @@ public class TestFifoScheduler {
Assert.assertEquals(1, metrics.getAppsSubmitted()); Assert.assertEquals(1, metrics.getAppsSubmitted());
} }
@Test(timeout=2000)
public void testNodeLocalAssignment() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, null, null);
FifoScheduler scheduler = new FifoScheduler();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(1024 * 64), 1234);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
int _appId = 1;
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
"queue1", "user1");
scheduler.handle(appEvent1);
int memory = 64;
int nConts = 3;
int priority = 20;
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest nodeLocal = createResourceRequest(memory,
node0.getHostName(), priority, nConts);
ResourceRequest rackLocal = createResourceRequest(memory,
node0.getRackName(), priority, nConts);
ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
nConts);
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>());
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
// Before the node update event, there are 3 local requests outstanding
Assert.assertEquals(3, nodeLocal.getNumContainers());
scheduler.handle(node0Update);
// After the node update event, check that there are no more local requests
// outstanding
Assert.assertEquals(0, nodeLocal.getNumContainers());
//Also check that the containers were scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(3, info.getLiveContainers().size());
}
// @Test // @Test
public void testFifoScheduler() throws Exception { public void testFifoScheduler() throws Exception {