YARN-8195. Fix constraint cardinality check in the presence of multiple target allocation tags. Contributed by Weiwei Yang.
(cherry picked from commit 9b09555451
)
This commit is contained in:
parent
ce62991648
commit
9d2967098d
|
@ -91,20 +91,20 @@ public final class PlacementConstraintsUtil {
|
||||||
if (sc.getScope().equals(PlacementConstraints.NODE)) {
|
if (sc.getScope().equals(PlacementConstraints.NODE)) {
|
||||||
if (checkMinCardinality) {
|
if (checkMinCardinality) {
|
||||||
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
||||||
allocationTags, Long::max);
|
allocationTags, Long::min);
|
||||||
}
|
}
|
||||||
if (checkMaxCardinality) {
|
if (checkMaxCardinality) {
|
||||||
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
||||||
allocationTags, Long::min);
|
allocationTags, Long::max);
|
||||||
}
|
}
|
||||||
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
|
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
|
||||||
if (checkMinCardinality) {
|
if (checkMinCardinality) {
|
||||||
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
||||||
allocationTags, Long::max);
|
allocationTags, Long::min);
|
||||||
}
|
}
|
||||||
if (checkMaxCardinality) {
|
if (checkMaxCardinality) {
|
||||||
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
||||||
allocationTags, Long::min);
|
allocationTags, Long::max);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -227,6 +228,93 @@ public class TestPlacementConstraintsUtil {
|
||||||
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiTagsPlacementConstraints()
|
||||||
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
PlacementConstraintManagerService pcm =
|
||||||
|
new MemoryPlacementConstraintManager();
|
||||||
|
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
|
||||||
|
rmContext.setAllocationTagsManager(tm);
|
||||||
|
rmContext.setPlacementConstraintManager(pcm);
|
||||||
|
|
||||||
|
HashSet<String> st1 = new HashSet<>(Arrays.asList("X"));
|
||||||
|
HashSet<String> st2 = new HashSet<>(Arrays.asList("Y"));
|
||||||
|
|
||||||
|
// X anti-affinity with A and B
|
||||||
|
PlacementConstraint pc1 = PlacementConstraints.build(
|
||||||
|
targetNotIn(NODE, allocationTag("A", "B")));
|
||||||
|
// Y affinity with A and B
|
||||||
|
PlacementConstraint pc2 = PlacementConstraints.build(
|
||||||
|
targetIn(NODE, allocationTag("A", "B")));
|
||||||
|
Map<Set<String>, PlacementConstraint> constraintMap =
|
||||||
|
ImmutableMap.of(st1, pc1, st2, pc2);
|
||||||
|
// Register App1 with affinity constraint map
|
||||||
|
pcm.registerApplication(appId1, constraintMap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Now place container:
|
||||||
|
* n0: A(1)
|
||||||
|
* n1: B(1)
|
||||||
|
* n2:
|
||||||
|
* n3:
|
||||||
|
*/
|
||||||
|
RMNode n0_r1 = rmNodes.get(0);
|
||||||
|
RMNode n1_r1 = rmNodes.get(1);
|
||||||
|
RMNode n2_r2 = rmNodes.get(2);
|
||||||
|
RMNode n3_r2 = rmNodes.get(3);
|
||||||
|
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
|
||||||
|
n0_r1.getRackName(), n0_r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
|
||||||
|
n1_r1.getRackName(), n1_r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
|
||||||
|
n2_r2.getRackName(), n2_r2.getNodeID());
|
||||||
|
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
|
||||||
|
n3_r2.getRackName(), n3_r2.getNodeID());
|
||||||
|
|
||||||
|
ContainerId ca = ContainerId
|
||||||
|
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
||||||
|
tm.addContainer(n0_r1.getNodeID(), ca, ImmutableSet.of("A"));
|
||||||
|
|
||||||
|
ContainerId cb = ContainerId
|
||||||
|
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
||||||
|
tm.addContainer(n1_r1.getNodeID(), cb, ImmutableSet.of("B"));
|
||||||
|
|
||||||
|
// n0 and n1 has A/B so they cannot satisfy the PC
|
||||||
|
// n2 and n3 doesn't have A or B, so they can satisfy the PC
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st1), schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st1), schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st1), schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st1), schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Now place container:
|
||||||
|
* n0: A(1)
|
||||||
|
* n1: B(1)
|
||||||
|
* n2: A(1), B(1)
|
||||||
|
* n3:
|
||||||
|
*/
|
||||||
|
ContainerId ca1 = ContainerId
|
||||||
|
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
||||||
|
tm.addContainer(n2_r2.getNodeID(), ca1, ImmutableSet.of("A"));
|
||||||
|
ContainerId cb1 = ContainerId
|
||||||
|
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
||||||
|
tm.addContainer(n2_r2.getNodeID(), cb1, ImmutableSet.of("B"));
|
||||||
|
|
||||||
|
// Only n2 has both A and B so only it can satisfy the PC
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st2), schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st2), schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st2), schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
|
createSchedulingRequest(st2), schedulerNode3, pcm, tm));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRackAffinityAssignment()
|
public void testRackAffinityAssignment()
|
||||||
throws InvalidAllocationTagsQueryException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
|
Loading…
Reference in New Issue