diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2cc30495202..99c8514814f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -344,6 +344,9 @@ Release 2.7.0 - UNRELEASED YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. (Chengbing Liu via jianhe) + YARN-3014. Replaces labels on a host should update all NM's labels on that + host. (Wangda Tan via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index e888cc56d82..aeefff1fc64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -130,6 +131,12 @@ public Node copy() { return c; } } + + private enum NodeLabelUpdateOperation { + ADD, + REMOVE, + REPLACE + } private final class ForwardingEventHandler implements EventHandler { @@ -290,45 +297,6 @@ protected void checkAddLabelsToNode( } } - @SuppressWarnings("unchecked") - protected void internalAddLabelsToNode( - Map> addedLabelsToNode) throws IOException { - // do add labels to nodes - Map> newNMToLabels = - new HashMap>(); - for (Entry> entry : addedLabelsToNode.entrySet()) { - NodeId nodeId = entry.getKey(); - Set labels = entry.getValue(); - - createHostIfNonExisted(nodeId.getHost()); - if (nodeId.getPort() == WILDCARD_PORT) { - Host host = nodeCollections.get(nodeId.getHost()); - host.labels.addAll(labels); - newNMToLabels.put(nodeId, host.labels); - } else { - createNodeIfNonExisted(nodeId); - Node nm = getNMInNodeSet(nodeId); - if (nm.labels == null) { - nm.labels = new HashSet(); - } - nm.labels.addAll(labels); - newNMToLabels.put(nodeId, nm.labels); - } - } - - if (null != dispatcher) { - dispatcher.getEventHandler().handle( - new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); - } - - // shows node->labels we added - LOG.info("addLabelsToNode:"); - for (Entry> entry : newNMToLabels.entrySet()) { - LOG.info(" NM=" + entry.getKey() + ", labels=[" - + StringUtils.join(entry.getValue().iterator(), ",") + "]"); - } - } - /** * add more labels to nodes * @@ -338,7 +306,7 @@ public void addLabelsToNode(Map> addedLabelsToNode) throws IOException { addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode); checkAddLabelsToNode(addedLabelsToNode); - internalAddLabelsToNode(addedLabelsToNode); + internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD); } protected void checkRemoveFromClusterNodeLabels( @@ -469,24 +437,75 @@ protected void checkRemoveLabelsFromNode( } @SuppressWarnings("unchecked") - protected void internalRemoveLabelsFromNode( - Map> removeLabelsFromNode) { - // do remove labels from nodes + protected void internalUpdateLabelsOnNodes( + Map> nodeToLabels, NodeLabelUpdateOperation op) + throws IOException { + // do update labels from nodes Map> newNMToLabels = new HashMap>(); - for (Entry> entry : removeLabelsFromNode.entrySet()) { + for (Entry> entry : nodeToLabels.entrySet()) { NodeId nodeId = entry.getKey(); Set labels = entry.getValue(); + createHostIfNonExisted(nodeId.getHost()); if (nodeId.getPort() == WILDCARD_PORT) { Host host = nodeCollections.get(nodeId.getHost()); - host.labels.removeAll(labels); + switch (op) { + case REMOVE: + host.labels.removeAll(labels); + for (Node node : host.nms.values()) { + if (node.labels != null) { + node.labels.removeAll(labels); + } + } + break; + case ADD: + host.labels.addAll(labels); + for (Node node : host.nms.values()) { + if (node.labels != null) { + node.labels.addAll(labels); + } + } + break; + case REPLACE: + host.labels.clear(); + host.labels.addAll(labels); + for (Node node : host.nms.values()) { + node.labels = null; + } + break; + default: + break; + } newNMToLabels.put(nodeId, host.labels); } else { - Node nm = getNMInNodeSet(nodeId); - if (nm.labels != null) { - nm.labels.removeAll(labels); + if (EnumSet.of(NodeLabelUpdateOperation.ADD, + NodeLabelUpdateOperation.REPLACE).contains(op)) { + // Add and replace + createNodeIfNonExisted(nodeId); + Node nm = getNMInNodeSet(nodeId); + if (nm.labels == null) { + nm.labels = new HashSet(); + } + switch (op) { + case ADD: + nm.labels.addAll(labels); + break; + case REPLACE: + nm.labels.clear(); + nm.labels.addAll(labels); + break; + default: + break; + } newNMToLabels.put(nodeId, nm.labels); + } else { + // remove + Node nm = getNMInNodeSet(nodeId); + if (nm.labels != null) { + nm.labels.removeAll(labels); + newNMToLabels.put(nodeId, nm.labels); + } } } } @@ -497,7 +516,7 @@ protected void internalRemoveLabelsFromNode( } // shows node->labels we added - LOG.info("removeLabelsFromNode:"); + LOG.info(op.name() + " labels on nodes:"); for (Entry> entry : newNMToLabels.entrySet()) { LOG.info(" NM=" + entry.getKey() + ", labels=[" + StringUtils.join(entry.getValue().iterator(), ",") + "]"); @@ -517,7 +536,8 @@ protected void internalRemoveLabelsFromNode( checkRemoveLabelsFromNode(removeLabelsFromNode); - internalRemoveLabelsFromNode(removeLabelsFromNode); + internalUpdateLabelsOnNodes(removeLabelsFromNode, + NodeLabelUpdateOperation.REMOVE); } protected void checkReplaceLabelsOnNode( @@ -539,47 +559,7 @@ protected void checkReplaceLabelsOnNode( } } } - - @SuppressWarnings("unchecked") - protected void internalReplaceLabelsOnNode( - Map> replaceLabelsToNode) throws IOException { - // do replace labels to nodes - Map> newNMToLabels = new HashMap>(); - for (Entry> entry : replaceLabelsToNode.entrySet()) { - NodeId nodeId = entry.getKey(); - Set labels = entry.getValue(); - createHostIfNonExisted(nodeId.getHost()); - if (nodeId.getPort() == WILDCARD_PORT) { - Host host = nodeCollections.get(nodeId.getHost()); - host.labels.clear(); - host.labels.addAll(labels); - newNMToLabels.put(nodeId, host.labels); - } else { - createNodeIfNonExisted(nodeId); - Node nm = getNMInNodeSet(nodeId); - if (nm.labels == null) { - nm.labels = new HashSet(); - } - nm.labels.clear(); - nm.labels.addAll(labels); - newNMToLabels.put(nodeId, nm.labels); - } - } - - if (null != dispatcher) { - dispatcher.getEventHandler().handle( - new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); - } - - // shows node->labels we added - LOG.info("setLabelsToNode:"); - for (Entry> entry : newNMToLabels.entrySet()) { - LOG.info(" NM=" + entry.getKey() + ", labels=[" - + StringUtils.join(entry.getValue().iterator(), ",") + "]"); - } - } - /** * replace labels to nodes * @@ -591,7 +571,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) checkReplaceLabelsOnNode(replaceLabelsToNode); - internalReplaceLabelsOnNode(replaceLabelsToNode); + internalUpdateLabelsOnNodes(replaceLabelsToNode, + NodeLabelUpdateOperation.REPLACE); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java index a56a5955770..c0b05e3b08f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java @@ -281,4 +281,42 @@ public void testTrimLabelsWhenModifyLabelsOnNodes() throws IOException { mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet(" p2 "))); Assert.assertTrue(mgr.getNodeLabels().isEmpty()); } + + @Test(timeout = 5000) + public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo() + throws IOException { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2"))); + assertMapEquals( + mgr.getNodeLabels(), + ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2"))); + + // Replace labels on n1:1 to P2 + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"), + toNodeId("n1:2"), toSet("p2"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p1", "p2"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"), + toSet("p2"))); + + // Replace labels on n1 to P1, both n1:1/n1 will be P1 now + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p1"), toNodeId("n1:1"), toSet("p1"), toNodeId("n1:2"), + toSet("p1"))); + + // Set labels on n1:1 to P2 again to verify if add/remove works + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"))); + + // Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3 + mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p1", "p3"), toNodeId("n1:1"), toSet("p2", "p3"), + toNodeId("n1:2"), toSet("p1", "p3"))); + + // Remove P3 from n1, should makes n1:1 to be p2, and n1:2 to be p1 + mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p1"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"), + toSet("p1"))); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 828d1bc34da..18478e36152 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.NodeLabel; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;