MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly enforce locality constraints.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170879 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4ba2acf336
commit
5183e88109
|
@ -1324,6 +1324,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service
|
||||
startup fails. (Ravi Teja via vinodkv)
|
||||
|
||||
MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly
|
||||
enforce locality constraints. (acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1023,21 +1023,17 @@ public class LeafQueue implements CSQueue {
|
|||
// Check if we need containers on this rack
|
||||
ResourceRequest rackLocalRequest =
|
||||
application.getResourceRequest(priority, node.getRackName());
|
||||
if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we are here, we do need containers on this rack for RACK_LOCAL req
|
||||
if (type == NodeType.RACK_LOCAL) {
|
||||
if (rackLocalRequest == null) {
|
||||
return false;
|
||||
} else {
|
||||
return rackLocalRequest.getNumContainers() > 0;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if we need containers on this host
|
||||
if (type == NodeType.NODE_LOCAL) {
|
||||
// First: Do we need containers on this rack?
|
||||
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Now check if we need containers on this host...
|
||||
ResourceRequest nodeLocalRequest =
|
||||
application.getResourceRequest(priority, node.getHostName());
|
||||
|
|
|
@ -289,6 +289,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
return nodes.get(nodeId);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
|
||||
String queueName, String user) {
|
||||
// TODO: Fix store
|
||||
|
@ -440,6 +441,14 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
ResourceRequest request =
|
||||
application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
|
||||
if (request != null) {
|
||||
// Don't allocate on this node if we don't need containers on this rack
|
||||
ResourceRequest rackRequest =
|
||||
application.getResourceRequest(priority,
|
||||
node.getRMNode().getRackName());
|
||||
if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int assignableContainers =
|
||||
Math.min(
|
||||
getMaxAllocatableContainers(application, priority, node,
|
||||
|
@ -458,6 +467,13 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
ResourceRequest request =
|
||||
application.getResourceRequest(priority, node.getRMNode().getRackName());
|
||||
if (request != null) {
|
||||
// Don't allocate on this rack if the application doens't need containers
|
||||
ResourceRequest offSwitchRequest =
|
||||
application.getResourceRequest(priority, SchedulerNode.ANY);
|
||||
if (offSwitchRequest.getNumContainers() <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int assignableContainers =
|
||||
Math.min(
|
||||
getMaxAllocatableContainers(application, priority, node,
|
||||
|
|
|
@ -625,7 +625,6 @@ public class TestLeafQueue {
|
|||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testLocalityScheduling() throws Exception {
|
||||
|
||||
|
@ -876,6 +875,107 @@ public class TestLeafQueue {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulingConstraints() throws Exception {
|
||||
|
||||
// Manipulate queue 'a'
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
|
||||
// User
|
||||
String user_0 = "user_0";
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId_0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
SchedulerApp app_0 =
|
||||
spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
|
||||
a.submitApplication(app_0, user_0, A);
|
||||
|
||||
// Setup some nodes and racks
|
||||
String host_0_0 = "host_0_0";
|
||||
String rack_0 = "rack_0";
|
||||
SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
|
||||
String host_0_1 = "host_0_1";
|
||||
SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
|
||||
|
||||
|
||||
String host_1_0 = "host_1_0";
|
||||
String rack_1 = "rack_1";
|
||||
SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
|
||||
|
||||
final int numNodes = 3;
|
||||
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Setup resource-requests and submit
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
|
||||
priority, recordFactory));
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(host_0_1, 1*GB, 1,
|
||||
priority, recordFactory));
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
|
||||
priority, recordFactory));
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
|
||||
priority, recordFactory));
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
||||
priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
// Start testing...
|
||||
|
||||
// Add one request
|
||||
app_0_requests_0.clear();
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
|
||||
priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
// NODE_LOCAL - node_0_1
|
||||
a.assignContainers(clusterResource, node_0_0);
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
||||
assertEquals(0, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
// No allocation on node_1_0 even though it's node/rack local since
|
||||
// required(ANY) == 0
|
||||
a.assignContainers(clusterResource, node_1_0);
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
|
||||
// since #req=0
|
||||
assertEquals(0, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
// Add one request
|
||||
app_0_requests_0.clear();
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
|
||||
priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
// No allocation on node_0_1 even though it's node/rack local since
|
||||
// required(rack_1) == 0
|
||||
a.assignContainers(clusterResource, node_0_1);
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
// NODE_LOCAL - node_1
|
||||
a.assignContainers(clusterResource, node_1_0);
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
||||
assertEquals(0, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue