Merge -c r1381868 from trunk to branch-2 to fix YARN-80. Add support for delaying rack-local containers in CapacityScheduler. Contributed by Arun C. Murthy.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1381873 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b9a3f51dcd
commit
69d9fa75bc
|
@ -44,6 +44,9 @@ Release 2.1.0-alpha - Unreleased
|
||||||
YARN-10. Fix DistributedShell module to not have a dependency on
|
YARN-10. Fix DistributedShell module to not have a dependency on
|
||||||
hadoop-mapreduce-client-core. (Hitesh Shah via vinodkv)
|
hadoop-mapreduce-client-core. (Hitesh Shah via vinodkv)
|
||||||
|
|
||||||
|
YARN-80. Add support for delaying rack-local containers in
|
||||||
|
CapacityScheduler. (acmurthy)
|
||||||
|
|
||||||
OPTIMAZATIONS
|
OPTIMAZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -115,6 +115,13 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
@Private
|
@Private
|
||||||
public static final String ROOT = "root";
|
public static final String ROOT = "root";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String NODE_LOCALITY_DELAY =
|
||||||
|
PREFIX + "node-locality-delay";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final int DEFAULT_NODE_LOCALITY_DELAY = -1;
|
||||||
|
|
||||||
public CapacitySchedulerConfiguration() {
|
public CapacitySchedulerConfiguration() {
|
||||||
this(new Configuration());
|
this(new Configuration());
|
||||||
}
|
}
|
||||||
|
@ -290,4 +297,9 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
public boolean getEnableUserMetrics() {
|
public boolean getEnableUserMetrics() {
|
||||||
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
|
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getNodeLocalityDelay() {
|
||||||
|
int delay = getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
|
||||||
|
return (delay == DEFAULT_NODE_LOCALITY_DELAY) ? 0 : delay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,6 +124,8 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
private final ActiveUsersManager activeUsersManager;
|
private final ActiveUsersManager activeUsersManager;
|
||||||
|
|
||||||
|
private final int nodeLocalityDelay;
|
||||||
|
|
||||||
public LeafQueue(CapacitySchedulerContext cs,
|
public LeafQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, CSQueue parent,
|
String queueName, CSQueue parent,
|
||||||
Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
|
Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
|
||||||
|
@ -188,6 +190,9 @@ public class LeafQueue implements CSQueue {
|
||||||
Map<QueueACL, AccessControlList> acls =
|
Map<QueueACL, AccessControlList> acls =
|
||||||
cs.getConfiguration().getAcls(getQueuePath());
|
cs.getConfiguration().getAcls(getQueuePath());
|
||||||
|
|
||||||
|
this.nodeLocalityDelay =
|
||||||
|
cs.getConfiguration().getNodeLocalityDelay();
|
||||||
|
|
||||||
setupQueueConfigs(
|
setupQueueConfigs(
|
||||||
cs.getClusterResources(),
|
cs.getClusterResources(),
|
||||||
capacity, absoluteCapacity,
|
capacity, absoluteCapacity,
|
||||||
|
@ -528,6 +533,11 @@ public class LeafQueue implements CSQueue {
|
||||||
return Collections.singletonList(userAclInfo);
|
return Collections.singletonList(userAclInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public int getNodeLocalityDelay() {
|
||||||
|
return nodeLocalityDelay;
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return queueName + ": " +
|
return queueName + ": " +
|
||||||
"capacity=" + capacity + ", " +
|
"capacity=" + capacity + ", " +
|
||||||
|
@ -1095,7 +1105,7 @@ public class LeafQueue implements CSQueue {
|
||||||
reservedContainer)) {
|
reservedContainer)) {
|
||||||
return assignContainer(clusterResource, node, application, priority, request,
|
return assignContainer(clusterResource, node, application, priority, request,
|
||||||
NodeType.RACK_LOCAL, reservedContainer);
|
NodeType.RACK_LOCAL, reservedContainer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
|
@ -1112,7 +1122,6 @@ public class LeafQueue implements CSQueue {
|
||||||
NodeType.OFF_SWITCH, reservedContainer);
|
NodeType.OFF_SWITCH, reservedContainer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1147,7 +1156,12 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
// If we are here, we do need containers on this rack for RACK_LOCAL req
|
// If we are here, we do need containers on this rack for RACK_LOCAL req
|
||||||
if (type == NodeType.RACK_LOCAL) {
|
if (type == NodeType.RACK_LOCAL) {
|
||||||
return true;
|
// 'Delay' rack-local just a little bit...
|
||||||
|
long missedOpportunities = application.getSchedulingOpportunities(priority);
|
||||||
|
return (
|
||||||
|
Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) <
|
||||||
|
missedOpportunities
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we need containers on this host
|
// Check if we need containers on this host
|
||||||
|
|
|
@ -92,4 +92,15 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.scheduler.capacity.node-locality-delay</name>
|
||||||
|
<value>-1</value>
|
||||||
|
<description>
|
||||||
|
Number of missed scheduling opportunities after which the CapacityScheduler
|
||||||
|
attempts to schedule rack-local containers.
|
||||||
|
Typically this should be set to number of racks in the cluster, this
|
||||||
|
feature is disabled by default, set to -1.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
@ -1288,19 +1289,29 @@ public class TestLeafQueue {
|
||||||
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
||||||
priority, recordFactory));
|
priority, recordFactory));
|
||||||
app_0_requests_0.add(
|
app_0_requests_0.add(
|
||||||
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // one extra
|
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, // one extra
|
||||||
priority, recordFactory));
|
priority, recordFactory));
|
||||||
app_0.updateResourceRequests(app_0_requests_0);
|
app_0.updateResourceRequests(app_0_requests_0);
|
||||||
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
String host_3 = "host_3"; // on rack_1
|
String host_3 = "host_3"; // on rack_1
|
||||||
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
||||||
|
|
||||||
|
// Rack-delay
|
||||||
|
doReturn(1).when(a).getNodeLocalityDelay();
|
||||||
|
|
||||||
|
// Shouldn't assign RACK_LOCAL yet
|
||||||
|
assignment = a.assignContainers(clusterResource, node_3);
|
||||||
|
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||||
|
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||||
|
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
||||||
|
|
||||||
|
// Should assign RACK_LOCAL now
|
||||||
assignment = a.assignContainers(clusterResource, node_3);
|
assignment = a.assignContainers(clusterResource, node_3);
|
||||||
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
||||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
||||||
assertEquals(0, app_0.getTotalRequiredResources(priority));
|
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
||||||
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue