YARN-7653. Node group support for AllocationTagsManager. (Panagiotis Garefalakis via asuresh)

This commit is contained in:
Arun Suresh 2017-12-22 07:24:37 -08:00
parent 06eb63e64b
commit 37f1a7b64f
4 changed files with 391 additions and 162 deletions

View File

@ -496,7 +496,7 @@ protected RMNodeLabelsManager createNodeLabelManager()
}
protected AllocationTagsManager createAllocationTagsManager() {
return new AllocationTagsManager();
return new AllocationTagsManager(this.rmContext);
}
protected DelegationTokenRenewer createDelegationTokenRenewer() {

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.log4j.Logger;
import java.util.HashMap;
@ -38,9 +39,8 @@
import java.util.function.LongBinaryOperator;
/**
* Support storing maps between container-tags/applications and
* nodes. This will be required by affinity/anti-affinity implementation and
* cardinality.
* In-memory mapping between applications/container-tags and nodes/racks.
* Required by constrained affinity/anti-affinity and cardinality placement.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -51,48 +51,54 @@ public class AllocationTagsManager {
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
private final RMContext rmContext;
// Application's tags to node
private Map<ApplicationId, NodeToCountedTags> perAppMappings =
// Application's tags to Node
private Map<ApplicationId, NodeToCountedTags> perAppNodeMappings =
new HashMap<>();
// Application's tags to Rack
private Map<ApplicationId, NodeToCountedTags> perAppRackMappings =
new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
private NodeToCountedTags globalMapping = new NodeToCountedTags();
private NodeToCountedTags<NodeId> globalNodeMapping = new NodeToCountedTags();
// Global tags to Rack mapping
private NodeToCountedTags<String> globalRackMapping = new NodeToCountedTags();
/**
* Store node to counted tags.
* Generic store mapping type <T> to counted tags.
* Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/
@VisibleForTesting
static class NodeToCountedTags {
// Map<NodeId, Map<Tag, Count>>
private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
new HashMap<>();
static class NodeToCountedTags<T> {
// Map<Type, Map<Tag, Count>>
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
// protected by external locks
private void addTagsToNode(NodeId nodeId, Set<String> tags) {
Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
k -> new HashMap<>());
private void addTags(T type, Set<String> tags) {
Map<String, Long> innerMap =
typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());
for (String tag : tags) {
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
} else{
} else {
innerMap.put(tag, count + 1);
}
}
}
// protected by external locks
private void addTagToNode(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
k -> new HashMap<>());
private void addTag(T type, String tag) {
Map<String, Long> innerMap =
typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
} else{
} else {
innerMap.put(tag, count + 1);
}
}
@ -104,17 +110,17 @@ private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
} else {
if (count <= 0) {
LOG.warn(
"Trying to remove tags from node, however the count already"
"Trying to remove tags from node/rack, however the count already"
+ " becomes 0 or less, it could be a potential bug.");
}
innerMap.remove(tag);
}
}
private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
private void removeTags(T type, Set<String> tags) {
Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
LOG.warn("Failed to find node=" + nodeId
LOG.warn("Failed to find node/rack=" + type
+ " while trying to remove tags, please double check.");
return;
}
@ -124,14 +130,14 @@ private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
}
if (innerMap.isEmpty()) {
nodeToTagsWithCount.remove(nodeId);
typeToTagsWithCount.remove(type);
}
}
private void removeTagFromNode(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
private void removeTag(T type, String tag) {
Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
LOG.warn("Failed to find node=" + nodeId
LOG.warn("Failed to find node/rack=" + type
+ " while trying to remove tags, please double check.");
return;
}
@ -139,12 +145,12 @@ private void removeTagFromNode(NodeId nodeId, String tag) {
removeTagFromInnerMap(innerMap, tag);
if (innerMap.isEmpty()) {
nodeToTagsWithCount.remove(nodeId);
typeToTagsWithCount.remove(type);
}
}
private long getCardinality(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
private long getCardinality(T type, String tag) {
Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
return 0;
}
@ -152,9 +158,9 @@ private long getCardinality(NodeId nodeId, String tag) {
return value == null ? 0 : value;
}
private long getCardinality(NodeId nodeId, Set<String> tags,
private long getCardinality(T type, Set<String> tags,
LongBinaryOperator op) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
return 0;
}
@ -193,29 +199,40 @@ private long getCardinality(NodeId nodeId, Set<String> tags,
}
private boolean isEmpty() {
return nodeToTagsWithCount.isEmpty();
return typeToTagsWithCount.isEmpty();
}
@VisibleForTesting
public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
return nodeToTagsWithCount;
public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
return typeToTagsWithCount;
}
}
@VisibleForTesting
Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
return perAppMappings;
Map<ApplicationId, NodeToCountedTags> getPerAppNodeMappings() {
return perAppNodeMappings;
}
@VisibleForTesting
NodeToCountedTags getGlobalMapping() {
return globalMapping;
Map<ApplicationId, NodeToCountedTags> getPerAppRackMappings() {
return perAppRackMappings;
}
public AllocationTagsManager() {
@VisibleForTesting
NodeToCountedTags getGlobalNodeMapping() {
return globalNodeMapping;
}
@VisibleForTesting
NodeToCountedTags getGlobalRackMapping() {
return globalRackMapping;
}
public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
rmContext = context;
}
/**
@ -243,21 +260,30 @@ public void addContainer(NodeId nodeId, ApplicationId applicationId,
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
applicationId, k -> new NodeToCountedTags());
NodeToCountedTags perAppTagsMapping = perAppNodeMappings
.computeIfAbsent(applicationId, k -> new NodeToCountedTags());
NodeToCountedTags perAppRackTagsMapping = perAppRackMappings
.computeIfAbsent(applicationId, k -> new NodeToCountedTags());
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
? rmContext.getRMNodes().get(nodeId).getRackName()
: "default-rack";
if (useSet) {
perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
globalMapping.addTagsToNode(nodeId, allocationTags);
perAppTagsMapping.addTags(nodeId, allocationTags);
perAppRackTagsMapping.addTags(nodeRack, allocationTags);
globalNodeMapping.addTags(nodeId, allocationTags);
globalRackMapping.addTags(nodeRack, allocationTags);
} else {
perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
globalMapping.addTagToNode(nodeId, applicationIdTag);
perAppTagsMapping.addTag(nodeId, applicationIdTag);
perAppRackTagsMapping.addTag(nodeRack, applicationIdTag);
globalNodeMapping.addTag(nodeId, applicationIdTag);
globalRackMapping.addTag(nodeRack, applicationIdTag);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added container=" + containerId + " with tags=[" + StringUtils
.join(allocationTags, ",") + "]");
LOG.debug("Added container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
@ -287,27 +313,40 @@ public void removeContainer(NodeId nodeId, ApplicationId applicationId,
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
NodeToCountedTags perAppTagsMapping =
perAppNodeMappings.get(applicationId);
NodeToCountedTags perAppRackTagsMapping =
perAppRackMappings.get(applicationId);
if (perAppTagsMapping == null) {
return;
}
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
? rmContext.getRMNodes().get(nodeId).getRackName()
: "default-rack";
if (useSet) {
perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
globalMapping.removeTagsFromNode(nodeId, allocationTags);
perAppTagsMapping.removeTags(nodeId, allocationTags);
perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
globalNodeMapping.removeTags(nodeId, allocationTags);
globalRackMapping.removeTags(nodeRack, allocationTags);
} else {
perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
globalMapping.removeTagFromNode(nodeId, applicationIdTag);
perAppTagsMapping.removeTag(nodeId, applicationIdTag);
perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag);
globalNodeMapping.removeTag(nodeId, applicationIdTag);
globalRackMapping.removeTag(nodeRack, applicationIdTag);
}
if (perAppTagsMapping.isEmpty()) {
perAppMappings.remove(applicationId);
perAppNodeMappings.remove(applicationId);
}
if (perAppRackTagsMapping.isEmpty()) {
perAppRackMappings.remove(applicationId);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Removed container=" + containerId + " with tags=[" + StringUtils
.join(allocationTags, ",") + "]");
LOG.debug("Removed container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
@ -315,18 +354,16 @@ public void removeContainer(NodeId nodeId, ApplicationId applicationId,
}
/**
* Get cardinality for following conditions. External can pass-in a binary op
* to implement customized logic. *
* Get Node cardinality for a specific tag.
* When applicationId is null, method returns aggregated cardinality
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
* depends on op. If a specified tag doesn't exist,
* 0 will be its cardinality.
* When null/empty tags specified, all tags
* (of the node/app) will be considered.
* If a specified tag doesn't exist,
* method returns 0.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
@ -338,14 +375,14 @@ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
try {
if (nodeId == null) {
throw new InvalidAllocationTagsQueryException(
"Must specify nodeId/tags/op to query cardinality");
"Must specify nodeId/tag to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppMappings.get(applicationId);
} else{
mapping = globalMapping;
mapping = perAppNodeMappings.get(applicationId);
} else {
mapping = globalNodeMapping;
}
if (mapping == null) {
@ -358,12 +395,55 @@ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
}
}
/**
* Get Rack cardinality for a specific tag.
*
* @param rack rack, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* If a specified tag doesn't exist,
* method returns 0.
* @return cardinality of specified query on the rack.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
public long getRackCardinality(String rack, ApplicationId applicationId,
String tag) throws InvalidAllocationTagsQueryException {
readLock.lock();
try {
if (rack == null) {
throw new InvalidAllocationTagsQueryException(
"Must specify rack/tag to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {
mapping = globalRackMapping;
}
if (mapping == null) {
return 0;
}
return mapping.getCardinality(rack, tag);
} finally {
readLock.unlock();
}
}
/**
* Check if given tag exists on node.
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* aggregation among all applications.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
@ -387,7 +467,7 @@ public boolean allocationTagExistsOnNode(NodeId nodeId,
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* aggregated cardinality among all applications.
* @param tags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
@ -396,7 +476,7 @@ public boolean allocationTagExistsOnNode(NodeId nodeId,
* specified, all tags (of the node/app) will be
* considered.
* @param op operator. Such as Long::max, Long::sum, etc. Required.
* This sparameter only take effect when #values >= 2.
* This parameter only take effect when #values >= 2.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
@ -414,9 +494,9 @@ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppMappings.get(applicationId);
} else{
mapping = globalMapping;
mapping = perAppNodeMappings.get(applicationId);
} else {
mapping = globalNodeMapping;
}
if (mapping == null) {
@ -428,4 +508,52 @@ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
readLock.unlock();
}
}
/**
* Get cardinality for following conditions. External can pass-in a binary op
* to implement customized logic.
*
* @param rack rack, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all applications.
* @param tags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
* depends on op. If a specified tag doesn't exist, 0
* will be its cardinality. When null/empty tags
* specified, all tags (of the rack/app) will be
* considered.
* @param op operator. Such as Long::max, Long::sum, etc. Required.
* This parameter only take effect when #values >= 2.
* @return cardinality of specified query on the rack.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
Set<String> tags, LongBinaryOperator op)
throws InvalidAllocationTagsQueryException {
readLock.lock();
try {
if (rack == null || op == null) {
throw new InvalidAllocationTagsQueryException(
"Must specify rack/tags/op to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {
mapping = globalRackMapping;
}
if (mapping == null) {
return 0;
}
return mapping.getCardinality(rack, tags, op);
} finally {
readLock.unlock();
}
}
}

View File

@ -405,8 +405,8 @@ public void testContainerTransitionNotifyPlacementTagsManager()
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
AllocationTagsManager tagsManager = new AllocationTagsManager();
RMContext rmContext = mock(RMContext.class);
AllocationTagsManager tagsManager = new AllocationTagsManager(rmContext);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);

View File

@ -20,202 +20,300 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
/**
* Test functionality of AllocationTagsManager.
*/
public class TestAllocationTagsManager {
private RMContext rmContext;
@Before
public void setup() {
MockRM rm = new MockRM();
rm.start();
MockNodes.resetHostIds();
List<RMNode> rmNodes =
MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
}
rmContext = rm.getRMContext();
}
@Test
public void testAllocationTagsManagerSimpleCases()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager atm = new AllocationTagsManager();
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
/**
* Construct test case:
* Node1:
* Node1 (rack0):
* container_1_1 (mapper/reducer/app_1)
* container_1_3 (service/app_1)
*
* Node2:
* Node2 (rack0):
* container_1_2 (mapper/reducer/app_1)
* container_1_4 (reducer/app_1)
* container_2_1 (service/app_2)
*/
// 3 Containers from app1
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Get Cardinality of app1 on node1, with tag "mapper"
// Get Node Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
Assert.assertEquals(2, atm.getRackCardinality("rack0",
TestUtils.getMockApplicationId(1), "mapper"));
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(3,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
// Get Cardinality by passing single tag.
// Get Node Cardinality by passing single tag.
Assert.assertEquals(1,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), "mapper"));
Assert.assertEquals(2,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), "reducer"));
// Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min
// Get Node Cardinality of app1 on node2, with tag "no_existed/reducer",
// op=min
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("no_existed", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet
.of(AllocationTagsNamespaces.APP_ID + TestUtils
.getMockApplicationId(1).toString()), Long::max));
Assert
.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of(AllocationTagsNamespaces.APP_ID
+ TestUtils.getMockApplicationId(1).toString()),
Long::max));
// Get Cardinality of app1 on node2, with empty tag set, op=max
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(7,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
ImmutableSet.of(), Long::sum));
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(7, atm.getNodeCardinalityByOp(
NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(5,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
// Finish all containers:
atm.removeContainer(NodeId.fromString("node1:1234"),
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node1:1234"),
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Expect all cardinality to be 0
// Get Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
// Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
Long::max));
Assert.assertEquals(0,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
TestUtils.getMockApplicationId(1).toString()));
// Get Cardinality of app1 on node2, with empty tag set, op=max
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
ImmutableSet.of(), Long::sum));
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(0, atm.getNodeCardinalityByOp(
NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
// Get Node Cardinality of app_2 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
}
@Test
public void testAllocationTagsManagerRackMapping()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
/**
* Construct Rack test case:
* Node1 (rack0):
* container_1_1 (mapper/reducer/app_1)
* container_1_4 (reducer/app_2)
*
* Node2 (rack0):
* container_1_2 (mapper/reducer/app_2)
* container_1_3 (service/app_1)
*
* Node5 (rack1):
* container_2_1 (service/app_2)
*/
// 3 Containers from app1
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4),
ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
Assert.assertEquals(1, atm.getRackCardinality("rack0",
TestUtils.getMockApplicationId(1), "mapper"));
// Get Rack Cardinality of app2 on rack0, with tag "reducer"
Assert.assertEquals(2, atm.getRackCardinality("rack0",
TestUtils.getMockApplicationId(2), "reducer"));
// Get Rack Cardinality of all apps on rack0, with tag "reducer"
Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer"));
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0",
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min));
// Get Rack Cardinality of all apps on rack0, with empty tag set, op=min
Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null,
ImmutableSet.of(), Long::max));
}
@Test
public void testAllocationTagsManagerMemoryAfterCleanup()
throws InvalidAllocationTagsQueryException {
@ -223,54 +321,57 @@ public void testAllocationTagsManagerMemoryAfterCleanup()
* Make sure YARN cleans up all memory once container/app finishes.
*/
AllocationTagsManager atm = new AllocationTagsManager();
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// Add a bunch of containers
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Remove all these containers
atm.removeContainer(NodeId.fromString("node1:1234"),
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node1:1234"),
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Check internal data structure
Assert.assertEquals(0,
atm.getGlobalMapping().getNodeToTagsWithCount().size());
Assert.assertEquals(0, atm.getPerAppMappings().size());
atm.getGlobalNodeMapping().getTypeToTagsWithCount().size());
Assert.assertEquals(0, atm.getPerAppNodeMappings().size());
Assert.assertEquals(0,
atm.getGlobalRackMapping().getTypeToTagsWithCount().size());
Assert.assertEquals(0, atm.getPerAppRackMappings().size());
}
@Test
@ -280,26 +381,26 @@ public void testQueryCardinalityWithIllegalParameters()
* Make sure YARN cleans up all memory once container/app finishes.
*/
AllocationTagsManager atm = new AllocationTagsManager();
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// Add a bunch of containers
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
@ -317,7 +418,7 @@ public void testQueryCardinalityWithIllegalParameters()
// No op
caughtException = false;
try {
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
} catch (InvalidAllocationTagsQueryException e) {
caughtException = true;