YARN-7822. Constraint satisfaction checker support for composite OR and AND constraints. (Weiwei Yang via asuresh)

This commit is contained in:
Arun Suresh 2018-01-30 10:15:33 -08:00
parent 8d1e2c6409
commit d481344783
4 changed files with 444 additions and 48 deletions

View File

@ -156,7 +156,7 @@ public class TestPlacementConstraintTransformations {
SingleConstraintTransformer singleTransformer =
new SingleConstraintTransformer(specConstraint);
PlacementConstraint simConstraint = singleTransformer.transform();
Assert.assertTrue(constraintExpr instanceof Or);
Assert.assertTrue(simConstraint.getConstraintExpr() instanceof Or);
Or simOrExpr = (Or) specConstraint.getConstraintExpr();
for (AbstractConstraint child : simOrExpr.getChildren()) {
Assert.assertTrue(child instanceof SingleConstraint);

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
@ -149,6 +151,48 @@ public final class PlacementConstraintsUtil {
return true;
}
/**
* Returns true if all child constraints are satisfied.
* @param appId application id
* @param constraint Or constraint
* @param node node
* @param atm allocation tags manager
* @return true if all child constraints are satisfied, false otherwise
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfyAndConstraint(ApplicationId appId,
And constraint, SchedulerNode node, AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
// Iterate over the constraints tree, if found any child constraint
// isn't satisfied, return false.
for (AbstractConstraint child : constraint.getChildren()) {
if(!canSatisfyConstraints(appId, child.build(), node, atm)) {
return false;
}
}
return true;
}
/**
* Returns true as long as any of child constraint is satisfied.
* @param appId application id
* @param constraint Or constraint
* @param node node
* @param atm allocation tags manager
* @return true if any child constraint is satisfied, false otherwise
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfyOrConstraint(ApplicationId appId,
Or constraint, SchedulerNode node, AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
for (AbstractConstraint child : constraint.getChildren()) {
if (canSatisfyConstraints(appId, child.build(), node, atm)) {
return true;
}
}
return false;
}
private static boolean canSatisfyConstraints(ApplicationId appId,
PlacementConstraint constraint, SchedulerNode node,
AllocationTagsManager atm)
@ -167,9 +211,16 @@ public final class PlacementConstraintsUtil {
if (sConstraintExpr instanceof SingleConstraint) {
SingleConstraint single = (SingleConstraint) sConstraintExpr;
return canSatisfySingleConstraint(appId, single, node, atm);
} else if (sConstraintExpr instanceof And) {
And and = (And) sConstraintExpr;
return canSatisfyAndConstraint(appId, and, node, atm);
} else if (sConstraintExpr instanceof Or) {
Or or = (Or) sConstraintExpr;
return canSatisfyOrConstraint(appId, or, node, atm);
} else {
throw new InvalidAllocationTagsQueryException(
"Unsupported type of constraint.");
"Unsupported type of constraint: "
+ sConstraintExpr.getClass().getSimpleName());
}
}

View File

@ -21,7 +21,12 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.AbstractMap;
import java.util.Arrays;
@ -34,6 +39,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -44,12 +50,11 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@ -66,9 +71,10 @@ public class TestPlacementConstraintsUtil {
private RMContext rmContext;
private static final int GB = 1024;
private ApplicationId appId1;
private PlacementConstraint c1, c2, c3, c4;
private PlacementConstraint c1, c2, c3, c4, c5, c6, c7;
private Set<String> sourceTag1, sourceTag2;
private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
private Map<Set<String>, PlacementConstraint> constraintMap1,
constraintMap2, constraintMap3, constraintMap4;
private AtomicLong requestID = new AtomicLong(0);
@Before
@ -92,6 +98,16 @@ public class TestPlacementConstraintsUtil {
.build(targetNotIn(NODE, allocationTag("hbase-m")));
c4 = PlacementConstraints
.build(targetNotIn(RACK, allocationTag("hbase-rs")));
c5 = PlacementConstraints
.build(and(targetNotIn(NODE, allocationTag("hbase-m")),
maxCardinality(NODE, 3, "spark")));
c6 = PlacementConstraints
.build(or(targetIn(NODE, allocationTag("hbase-m")),
targetIn(NODE, allocationTag("hbase-rs"))));
c7 = PlacementConstraints
.build(or(targetIn(NODE, allocationTag("hbase-m")),
and(targetIn(NODE, allocationTag("hbase-rs")),
targetIn(NODE, allocationTag("spark")))));
sourceTag1 = new HashSet<>(Arrays.asList("spark"));
sourceTag2 = new HashSet<>(Arrays.asList("zk"));
@ -106,6 +122,15 @@ public class TestPlacementConstraintsUtil {
new AbstractMap.SimpleEntry<>(sourceTag2, c4))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap3 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c5))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap4 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c6),
new AbstractMap.SimpleEntry<>(sourceTag2, c7))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
}
private SchedulingRequest createSchedulingRequest(Set<String> allocationTags,
@ -124,6 +149,20 @@ public class TestPlacementConstraintsUtil {
return createSchedulingRequest(allocationTags, null);
}
private ContainerId newContainerId(ApplicationId appId) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(appId, 0), 0);
}
private SchedulerNode newSchedulerNode(String hostname, String rackName,
NodeId nodeId) {
SchedulerNode node = mock(SchedulerNode.class);
when(node.getNodeName()).thenReturn(hostname);
when(node.getRackName()).thenReturn(rackName);
when(node.getNodeID()).thenReturn(nodeId);
return node;
}
@Test
public void testNodeAffinityAssignment()
throws InvalidAllocationTagsQueryException {
@ -137,8 +176,9 @@ public class TestPlacementConstraintsUtil {
Iterator<RMNode> nodeIterator = rmNodes.iterator();
while (nodeIterator.hasNext()) {
RMNode currentNode = nodeIterator.next();
FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
SchedulerNode schedulerNode =newSchedulerNode(currentNode.getHostName(),
currentNode.getRackName(), currentNode.getNodeID());
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@ -153,14 +193,15 @@ public class TestPlacementConstraintsUtil {
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@ -200,14 +241,15 @@ public class TestPlacementConstraintsUtil {
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 'zk' placement on Rack1 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
@ -238,14 +280,16 @@ public class TestPlacementConstraintsUtil {
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
@ -285,14 +329,14 @@ public class TestPlacementConstraintsUtil {
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 'zk' placement on Rack1 should FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
@ -306,4 +350,162 @@ public class TestPlacementConstraintsUtil {
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
@Test
public void testORConstraintAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map.
pcm.registerApplication(appId1, constraintMap4);
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(appId1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1), ImmutableSet.of("hbase-rs"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-rs").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
// n0 and n2 should be qualified for allocation as
// they either have hbase-m or hbase-rs tag
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1)
* n3: hbase-rs(1)
*/
tm.addContainer(n3r2.getNodeID(),
newContainerId(appId1), ImmutableSet.of("hbase-rs"));
// n3 is qualified now because it is allocated with hbase-rs tag
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1), spark(1)
* n3: hbase-rs(1)
*/
// Place
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1), ImmutableSet.of("spark"));
// According to constraint, "zk" is allowed to be placed on a node
// has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
@Test
public void testANDConstraintAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map.
pcm.registerApplication(appId1, constraintMap3);
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(appId1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1), ImmutableSet.of("hbase-m"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-m").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
// Anti-affinity with hbase-m so it should not be able to be placed
// onto n0 and n2 as they already have hbase-m allocated.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: spark(3)
* n2: hbase-m(1)
* n3: ""
*/
for (int i=0; i<4; i++) {
tm.addContainer(n1r1.getNodeID(),
newContainerId(appId1), ImmutableSet.of("spark"));
}
Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
.get("spark").longValue());
// Violate cardinality constraint
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
}
}

View File

@ -60,6 +60,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
@ -142,7 +143,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 4);
Assert.assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@ -195,7 +197,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@ -244,7 +247,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 8);
Assert.assertEquals(8, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
@ -294,7 +298,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
@ -347,7 +352,8 @@ public class TestPlacementProcessor {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 6);
Assert.assertEquals(6, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
@ -584,7 +590,7 @@ public class TestPlacementProcessor {
// Ensure unique nodes
Assert.assertEquals(4, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@ -592,9 +598,145 @@ public class TestPlacementProcessor {
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
}
@Test(timeout = 300000)
public void testAndOrPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Register app1 with following constraints
// 1) foo anti-affinity with foo on node
// 2) bar anti-affinity with foo on node AND maxCardinality = 2
// 3) moo affinity with foo OR bar
Map<Set<String>, PlacementConstraint> app1Constraints = new HashMap<>();
app1Constraints.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
app1Constraints.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.and(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")),
PlacementConstraints.maxCardinality(NODE, 2, "bar"))));
app1Constraints.put(Collections.singleton("moo"),
PlacementConstraints.build(
PlacementConstraints.or(
PlacementConstraints.targetIn(NODE, allocationTag("foo")),
PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, app1Constraints);
// Allocates 3 foo containers on 3 different nodes,
// in anti-affinity fashion.
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo")
));
List<Container> allocatedContainers = new ArrayList<>();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 3);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
Assert.assertEquals(3, allocatedContainers.size());
/** Testing AND placement constraint**/
// Now allocates a bar container, as restricted by the AND constraint,
// bar could be only allocated to the node without foo
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "bar")
));
allocatedContainers.clear();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 1);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
Assert.assertEquals(1, allocatedContainers.size());
NodeId barNode = allocatedContainers.get(0).getNodeId();
// Sends another 3 bar request, 2 of them can be allocated
// as maxCardinality is 2, for placed containers, they should be all
// on the node where the last bar was placed.
allocatedContainers.clear();
List<RejectedSchedulingRequest> rejectedContainers = new ArrayList<>();
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 2, 1, 512, "bar"),
schedulingRequest(1, 3, 1, 512, "bar"),
schedulingRequest(1, 4, 1, 512, "bar")
));
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, rejectedContainers, 2);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
Assert.assertEquals(2, allocatedContainers.size());
Assert.assertTrue(allocatedContainers.stream().allMatch(
container -> container.getNodeId().equals(barNode)));
// The third request could not be satisfied because it violates
// the cardinality constraint. Validate rejected request correctly
// capture this.
Assert.assertEquals(1, rejectedContainers.size());
Assert.assertEquals(COULD_NOT_PLACE_ON_NODE,
rejectedContainers.get(0).getReason());
/** Testing OR placement constraint**/
// Register one more NM for testing
MockNM nm5 = new MockNM("h5:1234", 4096, 100,
rm.getResourceTrackerService());
nodes.put(nm5.getNodeId(), nm5);
nm5.registerNode();
nm5.nodeHeartbeat(true);
List<SchedulingRequest> mooRequests = new ArrayList<>();
for (int i=5; i<25; i++) {
mooRequests.add(schedulingRequest(1, i, 1, 100, "moo"));
}
am1.addSchedulingRequest(mooRequests);
allocatedContainers.clear();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 20);
// All 20 containers should be allocated onto nodes besides nm5,
// because moo affinity to foo or bar which only exists on rest of nodes.
Assert.assertEquals(20, allocatedContainers.size());
for (Container mooContainer : allocatedContainers) {
// nm5 has no moo allocated containers.
Assert.assertFalse(mooContainer.getNodeId().equals(nm5.getNodeId()));
}
}
private static void printTags(Collection<MockNM> nodes,
AllocationTagsManager atm){
for (MockNM nm : nodes) {
Map<String, Long> nmTags = atm
.getAllocationTagsWithCount(nm.getNodeId());
StringBuffer sb = new StringBuffer();
if (nmTags != null) {
nmTags.forEach((tag, count) ->
sb.append(tag + "(" + count + "),"));
LOG.info("nm_" + nm.getNodeId() + ": " + sb.toString());
}
}
}
private static void waitForContainerAllocation(Collection<MockNM> nodes,
MockAM am, List<Container> allocatedContainers, int containerNum)
throws Exception {
MockAM am, List<Container> allocatedContainers,
List<RejectedSchedulingRequest> rejectedRequests,
int containerNum) throws Exception {
int attemptCount = 10;
while (allocatedContainers.size() < containerNum && attemptCount > 0) {
for (MockNM node : nodes) {
@ -605,6 +747,7 @@ public class TestPlacementProcessor {
sleep(1000);
AllocateResponse allocResponse = am.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedRequests.addAll(allocResponse.getRejectedSchedulingRequests());
attemptCount--;
}
}