YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.

This commit is contained in:
Inigo Goiri 2019-04-19 09:41:06 -07:00
parent 518f47bf9b
commit aeadb9432f
2 changed files with 50 additions and 36 deletions

View File

@ -318,6 +318,7 @@ public class OpportunisticContainerAllocator {
opportContext.addToOutstandingReqs(oppResourceReqs);
Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
Set<String> allocatedNodes = new HashSet<>();
List<Container> allocatedContainers = new ArrayList<>();
// Satisfy the outstanding OPPORTUNISTIC requests.
@ -335,7 +336,7 @@ public class OpportunisticContainerAllocator {
// the outstanding reqs)
Map<Resource, List<Allocation>> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
appSubmitter, nodeBlackList);
appSubmitter, nodeBlackList, allocatedNodes);
if (allocation.size() > 0) {
allocations.add(allocation);
continueLoop = true;
@ -357,14 +358,15 @@ public class OpportunisticContainerAllocator {
private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName, Set<String> blackList)
ApplicationAttemptId appAttId, String userName, Set<String> blackList,
Set<String> allocatedNodes)
throws YarnException {
Map<Resource, List<Allocation>> containers = new HashMap<>();
for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), blackList, appAttId,
appContext.getNodeMap(), userName, containers, enrichedAsk);
appContext.getContainerIdGenerator(), blackList, allocatedNodes,
appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, "
@ -379,9 +381,9 @@ public class OpportunisticContainerAllocator {
private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, RemoteNode> allNodes, String userName,
Map<Resource, List<Allocation>> allocations,
Set<String> blacklist, Set<String> allocatedNodes,
ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
String userName, Map<Resource, List<Allocation>> allocations,
EnrichedResourceRequest enrichedAsk)
throws YarnException {
if (allNodes.size() == 0) {
@ -406,7 +408,8 @@ public class OpportunisticContainerAllocator {
}
while (numAllocated < toAllocate) {
Collection<RemoteNode> nodeCandidates =
findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
enrichedAsk);
for (RemoteNode rNode : nodeCandidates) {
String rNodeHost = rNode.getNodeId().getHost();
// Ignore black list
@ -422,6 +425,10 @@ public class OpportunisticContainerAllocator {
} else {
continue;
}
} else if (allocatedNodes.contains(rNodeHost)) {
LOG.info("Opportunistic container has already been allocated on {}.",
rNodeHost);
continue;
}
if (loopIndex == RACK_LOCAL_LOOP) {
if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
@ -435,11 +442,7 @@ public class OpportunisticContainerAllocator {
anyAsk, rNode);
numAllocated++;
updateMetrics(loopIndex);
// Try to spread the allocations across the nodes.
// But don't add if it is a node local request.
if (loopIndex != NODE_LOCAL_LOOP) {
blacklist.add(rNode.getNodeId().getHost());
}
allocatedNodes.add(rNodeHost);
LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
"location [" + location + "]");
if (numAllocated >= toAllocate) {
@ -475,7 +478,7 @@ public class OpportunisticContainerAllocator {
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
EnrichedResourceRequest enrichedRR) {
Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
LinkedList<RemoteNode> retList = new LinkedList<>();
String partition = getRequestPartition(enrichedRR);
if (loopIndex > 1) {
@ -495,8 +498,9 @@ public class OpportunisticContainerAllocator {
allNodes, enrichedRR, retList, numContainers);
} else {
// Rack local candidates
numContainers = collectRackLocalCandidates(
allNodes, enrichedRR, retList, blackList, numContainers);
numContainers =
collectRackLocalCandidates(allNodes, enrichedRR, retList,
blackList, allocatedNodes, numContainers);
}
if (numContainers == enrichedRR.getRequest().getNumContainers()) {
// If there is no change in numContainers, then there is no point
@ -510,12 +514,16 @@ public class OpportunisticContainerAllocator {
private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
Set<String> blackList, int numContainers) {
Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
String partition = getRequestPartition(enrichedRR);
for (RemoteNode rNode : allNodes.values()) {
if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
enrichedRR.getRackLocations().contains(rNode.getRackName())) {
if (blackList.contains(rNode.getNodeId().getHost())) {
String rHost = rNode.getNodeId().getHost();
if (blackList.contains(rHost)) {
continue;
}
if (allocatedNodes.contains(rHost)) {
retList.addLast(rNode);
} else {
retList.addFirst(rNode);

View File

@ -196,18 +196,6 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
@ -227,6 +215,24 @@ public class TestOpportunisticContainerAllocator {
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
@ -247,14 +253,14 @@ public class TestOpportunisticContainerAllocator {
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
LOG.info("Containers: {}", containers);
Set<String> allocatedHosts = new HashSet<>();
// all 3 containers should be allocated.
Assert.assertEquals(3, containers.size());
// container with allocation id 2 and 3 should be allocated on node h1
for (Container c : containers) {
allocatedHosts.add(c.getNodeHttpAddress());
if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) {
Assert.assertEquals("h1:1234", c.getNodeHttpAddress());
}
}
Assert.assertEquals(2, containers.size());
Assert.assertTrue(allocatedHosts.contains("h1:1234"));
Assert.assertFalse(allocatedHosts.contains("h2:1234"));
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
}
@Test