YARN-1290. Let continuous scheduling achieve more balanced task assignment (Wei Yan via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1537731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9aa2f51812
commit
30007fd686
|
@ -77,6 +77,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
applications so that clients can get information about them post RM-restart.
|
applications so that clients can get information about them post RM-restart.
|
||||||
(Jian He via vinodkv)
|
(Jian He via vinodkv)
|
||||||
|
|
||||||
|
YARN-1290. Let continuous scheduling achieve more balanced task assignment
|
||||||
|
(Wei Yan via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -181,6 +181,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
||||||
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
||||||
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
|
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
|
||||||
|
private Comparator nodeAvailableResourceComparator =
|
||||||
|
new NodeAvailableResourceComparator(); // Node available resource comparator
|
||||||
protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
||||||
protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
||||||
protected long nodeLocalityDelayMs; // Delay for node locality
|
protected long nodeLocalityDelayMs; // Delay for node locality
|
||||||
|
@ -948,14 +950,22 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
|
|
||||||
private void continuousScheduling() {
|
private void continuousScheduling() {
|
||||||
while (true) {
|
while (true) {
|
||||||
for (FSSchedulerNode node : nodes.values()) {
|
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
||||||
try {
|
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
||||||
if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
|
|
||||||
attemptScheduling(node);
|
// iterate all nodes
|
||||||
|
for (NodeId nodeId : nodeIdList) {
|
||||||
|
if (nodes.containsKey(nodeId)) {
|
||||||
|
FSSchedulerNode node = nodes.get(nodeId);
|
||||||
|
try {
|
||||||
|
if (Resources.fitsIn(minimumAllocation,
|
||||||
|
node.getAvailableResource())) {
|
||||||
|
attemptScheduling(node);
|
||||||
|
}
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.warn("Error while attempting scheduling for node " + node +
|
||||||
|
": " + ex.toString(), ex);
|
||||||
}
|
}
|
||||||
} catch (Throwable ex) {
|
|
||||||
LOG.warn("Error while attempting scheduling for node " + node + ": " +
|
|
||||||
ex.toString(), ex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -967,6 +977,17 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Sort nodes by available resource */
|
||||||
|
private class NodeAvailableResourceComparator implements Comparator<NodeId> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(NodeId n1, NodeId n2) {
|
||||||
|
return RESOURCE_CALCULATOR.compare(clusterCapacity,
|
||||||
|
nodes.get(n2).getAvailableResource(),
|
||||||
|
nodes.get(n1).getAvailableResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void attemptScheduling(FSSchedulerNode node) {
|
private synchronized void attemptScheduling(FSSchedulerNode node) {
|
||||||
// Assign new containers...
|
// Assign new containers...
|
||||||
// 1. Check for reserved applications
|
// 1. Check for reserved applications
|
||||||
|
|
|
@ -33,6 +33,9 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -2348,7 +2352,7 @@ public class TestFairScheduler {
|
||||||
fs.applications, FSSchedulerApp.class);
|
fs.applications, FSSchedulerApp.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 10000)
|
||||||
public void testContinuousScheduling() throws Exception {
|
public void testContinuousScheduling() throws Exception {
|
||||||
// set continuous scheduling enabled
|
// set continuous scheduling enabled
|
||||||
FairScheduler fs = new FairScheduler();
|
FairScheduler fs = new FairScheduler();
|
||||||
|
@ -2359,16 +2363,21 @@ public class TestFairScheduler {
|
||||||
Assert.assertTrue("Continuous scheduling should be enabled.",
|
Assert.assertTrue("Continuous scheduling should be enabled.",
|
||||||
fs.isContinuousSchedulingEnabled());
|
fs.isContinuousSchedulingEnabled());
|
||||||
|
|
||||||
// Add one node
|
// Add two nodes
|
||||||
RMNode node1 =
|
RMNode node1 =
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
"127.0.0.1");
|
"127.0.0.1");
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
fs.handle(nodeEvent1);
|
fs.handle(nodeEvent1);
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
fs.handle(nodeEvent2);
|
||||||
|
|
||||||
// available resource
|
// available resource
|
||||||
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
|
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
|
||||||
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
|
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
|
||||||
|
|
||||||
// send application request
|
// send application request
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
|
@ -2391,6 +2400,28 @@ public class TestFairScheduler {
|
||||||
// check consumption
|
// check consumption
|
||||||
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
|
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
|
||||||
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
|
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
|
||||||
|
|
||||||
|
// another request
|
||||||
|
request =
|
||||||
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||||
|
ask.clear();
|
||||||
|
ask.add(request);
|
||||||
|
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
|
||||||
|
// Wait until app gets resources
|
||||||
|
while (app.getCurrentConsumption()
|
||||||
|
.equals(Resources.createResource(1024, 1))) { }
|
||||||
|
|
||||||
|
Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
|
||||||
|
Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
|
||||||
|
|
||||||
|
// 2 containers should be assigned to 2 nodes
|
||||||
|
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||||
|
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
nodes.add(it.next().getContainer().getNodeId());
|
||||||
|
}
|
||||||
|
Assert.assertEquals(2, nodes.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue