YARN-7682. Expose canSatisfyConstraints utility function to validate a placement against a constraint. (Panagiotis Garefalakis via asuresh)

This commit is contained in:
Arun Suresh 2018-01-03 08:00:50 -08:00
parent a52d11fb8c
commit bdba01f73b
4 changed files with 601 additions and 77 deletions

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
/**
* This class contains various static methods used by the Placement Algorithms
* to simplify constrained placement.
* (see also {@link DefaultPlacementAlgorithm}).
*/
@Public
@Unstable
public final class PlacementConstraintsUtil {
// Suppresses default constructor, ensuring non-instantiability.
private PlacementConstraintsUtil() {
}
/**
* Returns true if **single** application constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node.
*
* @param appId the application id
* @param sc the placement constraint
* @param te the target expression
* @param node the scheduler node
* @param tm the allocation tags store
* @return true if single application constraint is satisfied by node
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfySingleConstraintExpression(
ApplicationId appId, SingleConstraint sc, TargetExpression te,
SchedulerNode node, AllocationTagsManager tm)
throws InvalidAllocationTagsQueryException {
long minScopeCardinality = 0;
long maxScopeCardinality = 0;
if (sc.getScope() == PlacementConstraints.NODE) {
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
te.getTargetValues(), Long::max);
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
te.getTargetValues(), Long::min);
} else if (sc.getScope() == PlacementConstraints.RACK) {
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
te.getTargetValues(), Long::max);
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
te.getTargetValues(), Long::min);
}
// Make sure Anti-affinity satisfies hard upper limit
maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1
: maxScopeCardinality;
return (minScopeCardinality >= sc.getMinCardinality()
&& maxScopeCardinality < sc.getMaxCardinality());
}
/**
* Returns true if all application constraints with associated allocationTags
* are **currently** satisfied by a specific scheduler Node.
* To do so the method retrieves and goes through all application constraint
* expressions and checks if the specific allocation is between the allowed
* min-max cardinality values under the constraint scope (Node/Rack/etc).
*
* @param appId the application id
* @param allocationTags the allocation tags set
* @param node the scheduler node
* @param pcm the placement constraints store
* @param tagsManager the allocation tags store
* @return true if all application constraints are satisfied by node
* @throws InvalidAllocationTagsQueryException
*/
public static boolean canSatisfyConstraints(ApplicationId appId,
Set<String> allocationTags, SchedulerNode node,
PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
throws InvalidAllocationTagsQueryException {
PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
if (constraint == null) {
return true;
}
// Transform to SimpleConstraint
SingleConstraintTransformer singleTransformer =
new SingleConstraintTransformer(constraint);
constraint = singleTransformer.transform();
AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
SingleConstraint single = (SingleConstraint) sConstraintExpr;
// Iterate through TargetExpressions
Iterator<TargetExpression> expIt = single.getTargetExpressions().iterator();
while (expIt.hasNext()) {
TargetExpression currentExp = expIt.next();
// Supporting AllocationTag Expressions for now
if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
// Check if conditions are met
if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
node, tagsManager)) {
return false;
}
}
}
// return true if all targetExpressions are satisfied
return true;
}
}

View File

@ -19,19 +19,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
@ -65,58 +62,14 @@ public void init(RMContext rmContext) {
.getNodes(filter);
}
/**
* TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
* @param applicationId
* @param allocationTags
* @param nodeId
* @param tagsManager
* @return boolean
* @throws InvalidAllocationTagsQueryException
*/
public boolean canAssign(ApplicationId applicationId,
Set<String> allocationTags, NodeId nodeId,
AllocationTagsManager tagsManager)
throws InvalidAllocationTagsQueryException {
PlacementConstraint constraint =
constraintManager.getConstraint(applicationId, allocationTags);
if (constraint == null) {
return true;
}
// TODO: proper transformations
// Currently works only for simple anti-affinity
// NODE scope target expressions
PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
new PlacementConstraintTransformations.SpecializedConstraintTransformer(
constraint);
PlacementConstraint transform = transformer.transform();
PlacementConstraint.TargetConstraint targetConstraint =
(PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
// Assume a single target expression tag;
// The Sample Algorithm assumes a constraint will always be a simple
// Target Constraint with a single entry in the target set.
// As mentioned in the class javadoc - This algorithm should be
// used mostly for testing and validating end-2-end workflow.
String targetTag = targetConstraint.getTargetExpressions().iterator().next()
.getTargetValues().iterator().next();
// TODO: Assuming anti-affinity constraint
long nodeCardinality =
tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
if (nodeCardinality != 0) {
return false;
}
// return true if it is a valid placement
return true;
}
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
if (canAssign(appId,
schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
tagsManager)) {
if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest.getAllocationTags(), schedulerNode,
constraintManager, tagsManager)) {
return true;
}
}

View File

@ -0,0 +1,287 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
/**
* Test the PlacementConstraint Utility class functionality.
*/
public class TestPlacementConstraintsUtil {
private List<RMNode> rmNodes;
private RMContext rmContext;
private static final int GB = 1024;
private ApplicationId appId1;
private PlacementConstraint c1, c2, c3, c4;
private Set<String> sourceTag1, sourceTag2;
private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
@Before
public void setup() {
MockRM rm = new MockRM();
rm.start();
MockNodes.resetHostIds();
rmNodes = MockNodes.newNodes(2, 2, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
}
rmContext = rm.getRMContext();
// Build appIDs, constraints, source tags, and constraint map.
long ts = System.currentTimeMillis();
appId1 = BuilderUtils.newApplicationId(ts, 123);
c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
c3 = PlacementConstraints
.build(targetNotIn(NODE, allocationTag("hbase-m")));
c4 = PlacementConstraints
.build(targetNotIn(RACK, allocationTag("hbase-rs")));
sourceTag1 = new HashSet<>(Arrays.asList("spark"));
sourceTag2 = new HashSet<>(Arrays.asList("zk"));
constraintMap1 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c1),
new AbstractMap.SimpleEntry<>(sourceTag2, c2))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap2 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c3),
new AbstractMap.SimpleEntry<>(sourceTag2, c4))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
}
@Test
public void testNodeAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with affinity constraint map
pcm.registerApplication(appId1, constraintMap1);
// No containers are running so all 'zk' and 'spark' allocations should fail
// on every cluster NODE
Iterator<RMNode> nodeIterator = rmNodes.iterator();
while (nodeIterator.hasNext()) {
RMNode currentNode = nodeIterator.next();
FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode, pcm, tm));
}
/**
* Now place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-m)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode0, pcm, tm));
// FAIL on the rest of the nodes
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode3, pcm, tm));
}
@Test
public void testRackAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with affinity constraint map
pcm.registerApplication(appId1, constraintMap1);
/**
* Now place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-rs)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
// 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode1, pcm, tm));
// FAIL on the rest of the RACKs
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode3, pcm, tm));
}
@Test
public void testNodeAntiAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with anti-affinity constraint map
pcm.registerApplication(appId1, constraintMap2);
/**
* place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-m)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode0, pcm, tm));
// SUCCEED on the rest of the nodes
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode3, pcm, tm));
}
@Test
public void testRackAntiAffinityAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map
pcm.registerApplication(appId1, constraintMap2);
/**
* Place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-rs)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
// 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
FiCaSchedulerNode schedulerNode0 = TestUtils
.getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode1 = TestUtils
.getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode2 = TestUtils
.getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode1, pcm, tm));
// SUCCEED on the rest of the RACKs
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode3, pcm, tm));
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -48,16 +49,21 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
/**
* This tests end2end workflow of the constraint placement framework.
@ -104,7 +110,7 @@ public void stopRM() {
}
@Test(timeout = 300000)
public void testPlacement() throws Exception {
public void testAntiAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
@ -120,43 +126,173 @@ public void testPlacement() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo"))
);
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
while (allocatedContainers.size() < 4) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
}
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
Assert.assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure unique nodes (antiaffinity)
Assert.assertEquals(4, nodeIds.size());
}
@Test(timeout = 300000)
public void testCardinalityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' should not exceed 4 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(PlacementConstraints
.targetCardinality(NODE, 0, 4, allocationTag("foo")))));
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo"),
schedulingRequest(1, 6, 1, 512, "foo"),
schedulingRequest(1, 7, 1, 512, "foo"),
schedulingRequest(1, 8, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
Assert.assertEquals(8, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
allocatedContainers.stream().collect(
Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
// Ensure no more than 4 containers per node
for (NodeId n : nodeIdContainerIdMap.keySet()) {
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5);
}
}
@Test(timeout = 300000)
public void testAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' should be placed where
// containers with allocationTag 'bar' are already running
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure all containers end up on the same node (affinity)
Assert.assertEquals(1, nodeIds.size());
}
@Test(timeout = 300000)
public void testComplexPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
// Containers with allocationTag 'bar' should not exceed 1 per NODE
constraintMap.put(Collections.singleton("bar"),
PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar"))));
// Containers with allocationTag 'foo' should be placed where 'bar' exists
constraintMap.put(Collections.singleton("foo"),
PlacementConstraints.build(targetIn(NODE, allocationTag("bar"))));
// Containers with allocationTag 'foo' should not exceed 2 per NODE
constraintMap.put(Collections.singleton("foo"), PlacementConstraints
.build(targetCardinality(NODE, 0, 2, allocationTag("foo"))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap);
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "bar"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo"),
schedulingRequest(1, 6, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
Assert.assertEquals(6, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
allocatedContainers.stream().collect(
Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
// Ensure no more than 3 containers per node (1 'bar', 2 'foo')
for (NodeId n : nodeIdContainerIdMap.keySet()) {
Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4);
}
}
@Test(timeout = 300000)
public void testSchedulerRejection() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
@ -174,6 +310,7 @@ public void testSchedulerRejection() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@ -196,7 +333,6 @@ public void testSchedulerRejection() throws Exception {
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
@ -253,9 +389,10 @@ public void testRePlacementAfterSchedulerRejection() throws Exception {
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
// No not register nm5 yet..
// Do not register nm5 yet..
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@ -323,6 +460,7 @@ public void testPlacementRejection() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@ -346,7 +484,6 @@ public void testPlacementRejection() throws Exception {
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
@ -373,6 +510,21 @@ public void testPlacementRejection() throws Exception {
rej.getReason());
}
private static void waitForContainerAllocation(Collection<MockNM> nodes,
MockAM am, List<Container> allocatedContainers, int containerNum)
throws Exception {
while (allocatedContainers.size() < containerNum) {
for (MockNM node : nodes) {
node.nodeHeartbeat(true);
}
LOG.info("Waiting for containers to be created for "
+ am.getApplicationAttemptId().getApplicationId() + "...");
sleep(1000);
AllocateResponse allocResponse = am.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
}
}
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(priority, allocReqId, cores, mem,