YARN-3075. NodeLabelsManager implementation to retrieve label to node mapping (Varun Saxena via wangda)

This commit is contained in:
Wangda Tan 2015-02-03 12:51:21 -08:00
parent 843806d03a
commit 5bd984691b
8 changed files with 323 additions and 10 deletions

View File

@ -234,6 +234,9 @@ Release 2.7.0 - UNRELEASED
YARN-3098. Created common QueueCapacities class in Capacity Scheduler to
track capacities-by-labels of queues. (Wangda Tan via jianhe)
YARN-3075. NodeLabelsManager implementation to retrieve label to node
mapping (Varun Saxena via wangda)
OPTIMIZATIONS
BUG FIXES

View File

@ -121,15 +121,17 @@ protected static class Node {
public Set<String> labels;
public Resource resource;
public boolean running;
public NodeId nodeId;
protected Node() {
protected Node(NodeId nodeid) {
labels = null;
resource = Resource.newInstance(0, 0);
running = false;
nodeId = nodeid;
}
public Node copy() {
Node c = new Node();
Node c = new Node(nodeId);
if (labels != null) {
c.labels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@ -466,6 +468,26 @@ protected void checkRemoveLabelsFromNode(
}
}
private void addNodeToLabels(NodeId node, Set<String> labels) {
for(String l : labels) {
labelCollections.get(l).addNodeId(node);
}
}
private void removeNodeFromLabels(NodeId node, Set<String> labels) {
for(String l : labels) {
labelCollections.get(l).removeNodeId(node);
}
}
private void replaceNodeForLabels(NodeId node, Set<String> oldLabels,
Set<String> newLabels) {
if(oldLabels != null) {
removeNodeFromLabels(node, oldLabels);
}
addNodeToLabels(node, newLabels);
}
@SuppressWarnings("unchecked")
protected void internalUpdateLabelsOnNodes(
Map<NodeId, Set<String>> nodeToLabels, NodeLabelUpdateOperation op)
@ -473,6 +495,7 @@ protected void internalUpdateLabelsOnNodes(
// do update labels from nodes
Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>();
Set<String> oldLabels;
for (Entry<NodeId, Set<String>> entry : nodeToLabels.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
@ -482,25 +505,31 @@ protected void internalUpdateLabelsOnNodes(
Host host = nodeCollections.get(nodeId.getHost());
switch (op) {
case REMOVE:
removeNodeFromLabels(nodeId, labels);
host.labels.removeAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.removeAll(labels);
}
removeNodeFromLabels(node.nodeId, labels);
}
break;
case ADD:
addNodeToLabels(nodeId, labels);
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.addAll(labels);
}
addNodeToLabels(node.nodeId, labels);
}
break;
case REPLACE:
replaceNodeForLabels(nodeId, host.labels, labels);
host.labels.clear();
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
replaceNodeForLabels(node.nodeId, node.labels, labels);
node.labels = null;
}
break;
@ -514,14 +543,20 @@ protected void internalUpdateLabelsOnNodes(
// Add and replace
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
switch (op) {
case ADD:
addNodeToLabels(nodeId, labels);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
switch (op) {
case ADD:
nm.labels.addAll(labels);
break;
case REPLACE:
oldLabels = getLabelsByNode(nodeId);
replaceNodeForLabels(nodeId, oldLabels, labels);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.clear();
nm.labels.addAll(labels);
break;
@ -531,6 +566,7 @@ protected void internalUpdateLabelsOnNodes(
newNMToLabels.put(nodeId, nm.labels);
} else {
// remove
removeNodeFromLabels(nodeId, labels);
Node nm = getNMInNodeSet(nodeId);
if (nm.labels != null) {
nm.labels.removeAll(labels);
@ -646,6 +682,52 @@ public Map<NodeId, Set<String>> getNodeLabels() {
}
}
/**
* Get mapping of labels to nodes for all the labels.
*
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes() {
try {
readLock.lock();
return getLabelsToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
}
}
/**
* Get mapping of labels to nodes for specified set of labels.
*
* @param labels set of labels for which labels to nodes mapping will be
* returned.
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
try {
readLock.lock();
Map<String, Set<NodeId>> labelsToNodes =
new HashMap<String, Set<NodeId>>();
for (String label : labels) {
if(label.equals(NO_LABEL)) {
continue;
}
NodeLabel nodeLabelInfo = labelCollections.get(label);
if(nodeLabelInfo != null) {
Set<NodeId> nodeIds = nodeLabelInfo.getAssociatedNodeIds();
if (!nodeIds.isEmpty()) {
labelsToNodes.put(label, nodeIds);
}
} else {
LOG.warn("getLabelsToNodes : Label [" + label + "] cannot be found");
}
}
return Collections.unmodifiableMap(labelsToNodes);
} finally {
readLock.unlock();
}
}
/**
* Get existing valid labels in repository
*
@ -741,7 +823,7 @@ protected void createNodeIfNonExisted(NodeId nodeId) throws IOException {
}
Node nm = host.nms.get(nodeId);
if (null == nm) {
host.nms.put(nodeId, new Node());
host.nms.put(nodeId, new Node(nodeId));
}
}

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.yarn.nodelabels;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -26,6 +29,7 @@ public class NodeLabel implements Comparable<NodeLabel> {
private Resource resource;
private int numActiveNMs;
private String labelName;
private Set<NodeId> nodeIds;
public NodeLabel(String labelName) {
this(labelName, Resource.newInstance(0, 0), 0);
@ -35,6 +39,19 @@ protected NodeLabel(String labelName, Resource res, int activeNMs) {
this.labelName = labelName;
this.resource = res;
this.numActiveNMs = activeNMs;
this.nodeIds = new HashSet<NodeId>();
}
public void addNodeId(NodeId node) {
nodeIds.add(node);
}
public void removeNodeId(NodeId node) {
nodeIds.remove(node);
}
public Set<NodeId> getAssociatedNodeIds() {
return new HashSet<NodeId>(nodeIds);
}
public void addNode(Resource nodeRes) {

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.nodelabels;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.junit.Assert;
@ -39,6 +41,37 @@ public static void assertMapEquals(Map<NodeId, Set<String>> m1,
}
}
public static void assertLabelsToNodesEquals(Map<String, Set<NodeId>> m1,
ImmutableMap<String, Set<NodeId>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (String k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
Set<NodeId> s1 = new HashSet<NodeId>(m1.get(k));
Set<NodeId> s2 = new HashSet<NodeId>(m2.get(k));
Assert.assertEquals(s1, s2);
Assert.assertTrue(s1.containsAll(s2));
}
}
public static ImmutableMap<String, Set<NodeId>> transposeNodeToLabels(
Map<NodeId, Set<String>> mapNodeToLabels) {
Map<String, Set<NodeId>> mapLabelsToNodes =
new HashMap<String, Set<NodeId>>();
for(Entry<NodeId, Set<String>> entry : mapNodeToLabels.entrySet()) {
NodeId node = entry.getKey();
Set<String> setLabels = entry.getValue();
for(String label : setLabels) {
Set<NodeId> setNode = mapLabelsToNodes.get(label);
if (setNode == null) {
setNode = new HashSet<NodeId>();
}
setNode.add(NodeId.newInstance(node.getHost(), node.getPort()));
mapLabelsToNodes.put(label, setNode);
}
}
return ImmutableMap.copyOf(mapLabelsToNodes);
}
public static void assertMapContains(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
for (NodeId k : m2.keySet()) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.nodelabels;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
@ -310,7 +312,6 @@ public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo()
// Set labels on n1:1 to P2 again to verify if add/remove works
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2")));
// Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
@ -398,4 +399,123 @@ public void testNodeLabelsDisabled() throws IOException {
mgr.close();
}
@Test(timeout = 5000)
public void testLabelsToNodes()
throws IOException {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
Map<String, Set<NodeId>> labelsToNodes = mgr.getLabelsToNodes();
assertLabelsToNodesEquals(
labelsToNodes,
ImmutableMap.of(
"p1", toSet(toNodeId("n1")),
"p2",toSet(toNodeId("n1"))));
assertLabelsToNodesEquals(
labelsToNodes, transposeNodeToLabels(mgr.getNodeLabels()));
// Replace labels on n1:1 to P2
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"),
toNodeId("n1:2"), toSet("p2")));
labelsToNodes = mgr.getLabelsToNodes();
assertLabelsToNodesEquals(
labelsToNodes,
ImmutableMap.of(
"p1", toSet(toNodeId("n1")),
"p2", toSet(toNodeId("n1"),toNodeId("n1:1"),toNodeId("n1:2"))));
assertLabelsToNodesEquals(
labelsToNodes, transposeNodeToLabels(mgr.getNodeLabels()));
// Replace labels on n1 to P1, both n1:1/n1 will be P1 now
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
labelsToNodes = mgr.getLabelsToNodes();
assertLabelsToNodesEquals(
labelsToNodes,
ImmutableMap.of(
"p1", toSet(toNodeId("n1"),toNodeId("n1:1"),toNodeId("n1:2"))));
assertLabelsToNodesEquals(
labelsToNodes, transposeNodeToLabels(mgr.getNodeLabels()));
// Set labels on n1:1 to P2 again to verify if add/remove works
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2")));
// Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
labelsToNodes = mgr.getLabelsToNodes();
assertLabelsToNodesEquals(
labelsToNodes,
ImmutableMap.of(
"p1", toSet(toNodeId("n1"),toNodeId("n1:2")),
"p2", toSet(toNodeId("n1:1")),
"p3", toSet(toNodeId("n1"),toNodeId("n1:1"),toNodeId("n1:2"))));
assertLabelsToNodesEquals(
labelsToNodes, transposeNodeToLabels(mgr.getNodeLabels()));
// Remove P3 from n1, should makes n1:1 to be p2, and n1:2 to be p1
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
labelsToNodes = mgr.getLabelsToNodes();
assertLabelsToNodesEquals(
labelsToNodes,
ImmutableMap.of(
"p1", toSet(toNodeId("n1"),toNodeId("n1:2")),
"p2", toSet(toNodeId("n1:1"))));
assertLabelsToNodesEquals(
labelsToNodes, transposeNodeToLabels(mgr.getNodeLabels()));
}
@Test(timeout = 5000)
public void testLabelsToNodesForSelectedLabels()
throws IOException {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addLabelsToNode(
ImmutableMap.of(
toNodeId("n1:1"), toSet("p1", "p2"),
toNodeId("n1:2"), toSet("p1", "p2")));
Set<String> setlabels =
new HashSet<String>(Arrays.asList(new String[]{"p1"}));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(setlabels),
ImmutableMap.of("p1", toSet(toNodeId("n1:1"),toNodeId("n1:2"))));
// Replace labels on n1:1 to P2
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
assertTrue(mgr.getLabelsToNodes(setlabels).isEmpty());
setlabels = new HashSet<String>(Arrays.asList(new String[]{"p2", "p3"}));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(setlabels),
ImmutableMap.of(
"p3", toSet(toNodeId("n1"), toNodeId("n1:1"),toNodeId("n1:2"))));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1:3"), toSet("p1", "p2")));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(setlabels),
ImmutableMap.of(
"p2", toSet(toNodeId("n1:3")),
"p3", toSet(toNodeId("n1"), toNodeId("n1:1"),toNodeId("n1:2"))));
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
setlabels =
new HashSet<String>(Arrays.asList(new String[]{"p1", "p2", "p3"}));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(setlabels),
ImmutableMap.of(
"p1", toSet(toNodeId("n1:3")),
"p2", toSet(toNodeId("n1:3"))));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n2:2"), toSet("p1", "p2")));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(setlabels),
ImmutableMap.of(
"p1", toSet(toNodeId("n1:3"), toNodeId("n2:2")),
"p2", toSet(toNodeId("n1:3"), toNodeId("n2:2"))));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2:2"), toSet("p3")));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(setlabels),
ImmutableMap.of(
"p1", toSet(toNodeId("n1:3")),
"p2", toSet(toNodeId("n1:3")),
"p3", toSet(toNodeId("n2:2"))));
setlabels = new HashSet<String>(Arrays.asList(new String[]{"p1"}));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(setlabels),
ImmutableMap.of("p1", toSet(toNodeId("n1:3"))));
}
}

View File

@ -116,6 +116,11 @@ public void testRecoverWithMirror() throws Exception {
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(),
ImmutableMap.of(
"p6", toSet(toNodeId("n6"), toNodeId("n7")),
"p4", toSet(toNodeId("n4")),
"p2", toSet(toNodeId("n2"))));
// stutdown mgr and start a new mgr
mgr.stop();
@ -130,6 +135,11 @@ public void testRecoverWithMirror() throws Exception {
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(),
ImmutableMap.of(
"p6", toSet(toNodeId("n6"), toNodeId("n7")),
"p4", toSet(toNodeId("n4")),
"p2", toSet(toNodeId("n2"))));
mgr.stop();
}
@ -169,6 +179,11 @@ public void testEditlogRecover() throws Exception {
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(),
ImmutableMap.of(
"p6", toSet(toNodeId("n6"), toNodeId("n7")),
"p4", toSet(toNodeId("n4")),
"p2", toSet(toNodeId("n2"))));
mgr.stop();
}
@ -218,6 +233,11 @@ public void testSerilizationAfterRecovery() throws Exception {
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
assertLabelsToNodesEquals(mgr.getLabelsToNodes(),
ImmutableMap.of(
"p6", toSet(toNodeId("n6"), toNodeId("n7")),
"p4", toSet(toNodeId("n4")),
"p2", toSet(toNodeId("n2"))));
/*
* Add label p7,p8 then shutdown

View File

@ -201,6 +201,17 @@ public void activateNode(NodeId nodeId, Resource resource) {
nm.resource = resource;
nm.running = true;
// Add node in labelsCollection
Set<String> labelsForNode = getLabelsByNode(nodeId);
if (labelsForNode != null) {
for (String label : labelsForNode) {
NodeLabel labelInfo = labelCollections.get(label);
if(labelInfo != null) {
labelInfo.addNodeId(nodeId);
}
}
}
// get the node after edition
Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));

View File

@ -464,4 +464,31 @@ public void testPullRMNodeLabelsInfo() throws IOException {
checkNodeLabelInfo(infos, "y", 1, 10);
checkNodeLabelInfo(infos, "z", 0, 0);
}
@Test(timeout = 5000)
public void testLabelsToNodesOnNodeActiveDeactive() throws Exception {
// Activate a node without assigning any labels
mgr.activateNode(NodeId.newInstance("n1", 1), Resource.newInstance(10, 0));
Assert.assertTrue(mgr.getLabelsToNodes().isEmpty());
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
// Add labels and replace labels on node
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
// Activate a node for which host to label mapping exists
mgr.activateNode(NodeId.newInstance("n1", 2), Resource.newInstance(10, 0));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
// Deactivate a node. Label mapping should still exist.
mgr.deactivateNode(NodeId.newInstance("n1", 1));
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
}
}