YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)

Contributed by Qi Zhu.
This commit is contained in:
zhuqi 2020-12-09 19:48:39 +08:00 committed by GitHub
parent aaf9e3d320
commit d67ccd03e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 31 deletions

View File

@ -496,4 +496,15 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
} }
return nodesPerPartition; return nodesPerPartition;
} }
public List<String> getPartitions() {
List<String> partitions = null;
readLock.lock();
try {
partitions = new ArrayList(nodesPerLabel.keySet());
} finally {
readLock.unlock();
}
return partitions;
}
} }

View File

@ -531,6 +531,8 @@ public class CapacityScheduler extends
/** /**
* Schedule on all nodes by starting at a random point. * Schedule on all nodes by starting at a random point.
* Schedule on all partitions by starting at a random partition
* when multiNodePlacementEnabled is true.
* @param cs * @param cs
*/ */
static void schedule(CapacityScheduler cs) throws InterruptedException{ static void schedule(CapacityScheduler cs) throws InterruptedException{
@ -544,44 +546,79 @@ public class CapacityScheduler extends
if(nodeSize == 0) { if(nodeSize == 0) {
return; return;
} }
int start = random.nextInt(nodeSize);
// To avoid too verbose DEBUG logging, only print debug log once for if (!cs.multiNodePlacementEnabled) {
// every 10 secs. int start = random.nextInt(nodeSize);
boolean printSkipedNodeLogging = false;
if (Time.monotonicNow() / 1000 % 10 == 0) {
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
} else {
printedVerboseLoggingForAsyncScheduling = false;
}
// Allocate containers of node [start, end) // To avoid too verbose DEBUG logging, only print debug log once for
for (FiCaSchedulerNode node : nodes) { // every 10 secs.
if (current++ >= start) { boolean printSkipedNodeLogging = false;
if (Time.monotonicNow() / 1000 % 10 == 0) {
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
} else {
printedVerboseLoggingForAsyncScheduling = false;
}
// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
current = 0;
// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
}
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue; continue;
} }
cs.allocateContainersToNode(node.getNodeID(), false); cs.allocateContainersToNode(node.getNodeID(), false);
} }
}
current = 0; if (printSkipedNodeLogging) {
printedVerboseLoggingForAsyncScheduling = true;
// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
} }
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { } else {
continue; // Get all partitions
List<String> partitions = cs.nodeTracker.getPartitions();
int partitionSize = partitions.size();
// First randomize the start point
int start = random.nextInt(partitionSize);
// Allocate containers of partition [start, end)
for (String partititon : partitions) {
if (current++ >= start) {
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partititon);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}
} }
cs.allocateContainersToNode(node.getNodeID(), false);
}
if (printSkipedNodeLogging) { current = 0;
printedVerboseLoggingForAsyncScheduling = true;
}
// Allocate containers of partition [0, start)
for (String partititon : partitions) {
if (current++ > start) {
break;
}
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partititon);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}
}
Thread.sleep(cs.getAsyncScheduleInterval()); Thread.sleep(cs.getAsyncScheduleInterval());
} }
@ -1486,17 +1523,34 @@ public class CapacityScheduler extends
} }
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet( private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) { String partition) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(partition);
if (nodes != null && !nodes.isEmpty()) {
//Filter for node heartbeat too long
nodes.stream()
.filter(node -> !shouldSkipNodeSchedule(node, this, true))
.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, partition);
}
return candidates;
}
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null; CandidateNodeSet<FiCaSchedulerNode> candidates = null;
candidates = new SimpleCandidateNodeSet<>(node); candidates = new SimpleCandidateNodeSet<>(node);
if (multiNodePlacementEnabled) { if (multiNodePlacementEnabled) {
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>(); Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(node.getPartition()); .getNodesPerPartition(node.getPartition());
if (nodes != null && !nodes.isEmpty()) { if (nodes != null && !nodes.isEmpty()) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>( candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, node.getPartition()); nodesByPartition, node.getPartition());
} }
} }
return candidates; return candidates;
@ -1513,8 +1567,8 @@ public class CapacityScheduler extends
int offswitchCount = 0; int offswitchCount = 0;
int assignedContainers = 0; int assignedContainers = 0;
CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet( CandidateNodeSet<FiCaSchedulerNode> candidates =
node); getCandidateNodeSet(node);
CSAssignment assignment = allocateContainersToNode(candidates, CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat); withNodeHeartbeat);
// Only check if we can allocate more container on the same node when // Only check if we can allocate more container on the same node when

View File

@ -85,6 +85,10 @@ public class TestCapacitySchedulerAsyncScheduling {
private NMHeartbeatThread nmHeartbeatThread = null; private NMHeartbeatThread nmHeartbeatThread = null;
private static final String POLICY_CLASS_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler" +
".placement.ResourceUsageMultiNodeLookupPolicy";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
@ -111,6 +115,21 @@ public class TestCapacitySchedulerAsyncScheduling {
testAsyncContainerAllocation(3); testAsyncContainerAllocation(3);
} }
@Test(timeout = 300000)
public void testAsyncContainerAllocationWithMultiNode() throws Exception {
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
"resource-based");
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
"resource-based");
String policyName =
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based" + ".class";
conf.set(policyName, POLICY_CLASS_NAME);
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
true);
testAsyncContainerAllocation(2);
}
public void testAsyncContainerAllocation(int numThreads) throws Exception { public void testAsyncContainerAllocation(int numThreads) throws Exception {
conf.setInt( conf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,