YARN-3014. Replaces labels on a host should update all NM's labels on that host. Contributed by Wangda Tan
(cherry picked from commit a260406268
)
This commit is contained in:
parent
68709cb9be
commit
36b3dcaab2
|
@ -310,6 +310,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
|
||||
container statuses on heartbeat. (Chengbing Liu via jianhe)
|
||||
|
||||
YARN-3014. Replaces labels on a host should update all NM's labels on that
|
||||
host. (Wangda Tan via jianhe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.nodelabels;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
@ -130,6 +131,12 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
private enum NodeLabelUpdateOperation {
|
||||
ADD,
|
||||
REMOVE,
|
||||
REPLACE
|
||||
}
|
||||
|
||||
private final class ForwardingEventHandler implements
|
||||
EventHandler<NodeLabelsStoreEvent> {
|
||||
|
@ -290,45 +297,6 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void internalAddLabelsToNode(
|
||||
Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
|
||||
// do add labels to nodes
|
||||
Map<NodeId, Set<String>> newNMToLabels =
|
||||
new HashMap<NodeId, Set<String>>();
|
||||
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Set<String> labels = entry.getValue();
|
||||
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
host.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, host.labels);
|
||||
} else {
|
||||
createNodeIfNonExisted(nodeId);
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels == null) {
|
||||
nm.labels = new HashSet<String>();
|
||||
}
|
||||
nm.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, nm.labels);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != dispatcher) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
|
||||
}
|
||||
|
||||
// shows node->labels we added
|
||||
LOG.info("addLabelsToNode:");
|
||||
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
|
||||
LOG.info(" NM=" + entry.getKey() + ", labels=["
|
||||
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* add more labels to nodes
|
||||
*
|
||||
|
@ -338,7 +306,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
throws IOException {
|
||||
addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode);
|
||||
checkAddLabelsToNode(addedLabelsToNode);
|
||||
internalAddLabelsToNode(addedLabelsToNode);
|
||||
internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD);
|
||||
}
|
||||
|
||||
protected void checkRemoveFromClusterNodeLabels(
|
||||
|
@ -469,24 +437,75 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void internalRemoveLabelsFromNode(
|
||||
Map<NodeId, Set<String>> removeLabelsFromNode) {
|
||||
// do remove labels from nodes
|
||||
protected void internalUpdateLabelsOnNodes(
|
||||
Map<NodeId, Set<String>> nodeToLabels, NodeLabelUpdateOperation op)
|
||||
throws IOException {
|
||||
// do update labels from nodes
|
||||
Map<NodeId, Set<String>> newNMToLabels =
|
||||
new HashMap<NodeId, Set<String>>();
|
||||
for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
|
||||
for (Entry<NodeId, Set<String>> entry : nodeToLabels.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Set<String> labels = entry.getValue();
|
||||
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
host.labels.removeAll(labels);
|
||||
switch (op) {
|
||||
case REMOVE:
|
||||
host.labels.removeAll(labels);
|
||||
for (Node node : host.nms.values()) {
|
||||
if (node.labels != null) {
|
||||
node.labels.removeAll(labels);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ADD:
|
||||
host.labels.addAll(labels);
|
||||
for (Node node : host.nms.values()) {
|
||||
if (node.labels != null) {
|
||||
node.labels.addAll(labels);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case REPLACE:
|
||||
host.labels.clear();
|
||||
host.labels.addAll(labels);
|
||||
for (Node node : host.nms.values()) {
|
||||
node.labels = null;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
newNMToLabels.put(nodeId, host.labels);
|
||||
} else {
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels != null) {
|
||||
nm.labels.removeAll(labels);
|
||||
if (EnumSet.of(NodeLabelUpdateOperation.ADD,
|
||||
NodeLabelUpdateOperation.REPLACE).contains(op)) {
|
||||
// Add and replace
|
||||
createNodeIfNonExisted(nodeId);
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels == null) {
|
||||
nm.labels = new HashSet<String>();
|
||||
}
|
||||
switch (op) {
|
||||
case ADD:
|
||||
nm.labels.addAll(labels);
|
||||
break;
|
||||
case REPLACE:
|
||||
nm.labels.clear();
|
||||
nm.labels.addAll(labels);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
newNMToLabels.put(nodeId, nm.labels);
|
||||
} else {
|
||||
// remove
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels != null) {
|
||||
nm.labels.removeAll(labels);
|
||||
newNMToLabels.put(nodeId, nm.labels);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -497,7 +516,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
|
||||
// shows node->labels we added
|
||||
LOG.info("removeLabelsFromNode:");
|
||||
LOG.info(op.name() + " labels on nodes:");
|
||||
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
|
||||
LOG.info(" NM=" + entry.getKey() + ", labels=["
|
||||
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
|
||||
|
@ -517,7 +536,8 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
|
||||
checkRemoveLabelsFromNode(removeLabelsFromNode);
|
||||
|
||||
internalRemoveLabelsFromNode(removeLabelsFromNode);
|
||||
internalUpdateLabelsOnNodes(removeLabelsFromNode,
|
||||
NodeLabelUpdateOperation.REMOVE);
|
||||
}
|
||||
|
||||
protected void checkReplaceLabelsOnNode(
|
||||
|
@ -539,47 +559,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void internalReplaceLabelsOnNode(
|
||||
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
|
||||
// do replace labels to nodes
|
||||
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
|
||||
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Set<String> labels = entry.getValue();
|
||||
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
host.labels.clear();
|
||||
host.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, host.labels);
|
||||
} else {
|
||||
createNodeIfNonExisted(nodeId);
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels == null) {
|
||||
nm.labels = new HashSet<String>();
|
||||
}
|
||||
nm.labels.clear();
|
||||
nm.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, nm.labels);
|
||||
}
|
||||
}
|
||||
|
||||
if (null != dispatcher) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
|
||||
}
|
||||
|
||||
// shows node->labels we added
|
||||
LOG.info("setLabelsToNode:");
|
||||
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
|
||||
LOG.info(" NM=" + entry.getKey() + ", labels=["
|
||||
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* replace labels to nodes
|
||||
*
|
||||
|
@ -591,7 +571,8 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
|
||||
checkReplaceLabelsOnNode(replaceLabelsToNode);
|
||||
|
||||
internalReplaceLabelsOnNode(replaceLabelsToNode);
|
||||
internalUpdateLabelsOnNodes(replaceLabelsToNode,
|
||||
NodeLabelUpdateOperation.REPLACE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -281,4 +281,42 @@ public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
|
|||
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet(" p2 ")));
|
||||
Assert.assertTrue(mgr.getNodeLabels().isEmpty());
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo()
|
||||
throws IOException {
|
||||
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
|
||||
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
|
||||
assertMapEquals(
|
||||
mgr.getNodeLabels(),
|
||||
ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
|
||||
|
||||
// Replace labels on n1:1 to P2
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"),
|
||||
toNodeId("n1:2"), toSet("p2")));
|
||||
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
|
||||
toSet("p1", "p2"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
|
||||
toSet("p2")));
|
||||
|
||||
// Replace labels on n1 to P1, both n1:1/n1 will be P1 now
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
|
||||
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
|
||||
toSet("p1"), toNodeId("n1:1"), toSet("p1"), toNodeId("n1:2"),
|
||||
toSet("p1")));
|
||||
|
||||
// Set labels on n1:1 to P2 again to verify if add/remove works
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2")));
|
||||
|
||||
// Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3
|
||||
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
|
||||
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
|
||||
toSet("p1", "p3"), toNodeId("n1:1"), toSet("p2", "p3"),
|
||||
toNodeId("n1:2"), toSet("p1", "p3")));
|
||||
|
||||
// Remove P3 from n1, should makes n1:1 to be p2, and n1:2 to be p1
|
||||
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
|
||||
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
|
||||
toSet("p1"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
|
||||
toSet("p1")));
|
||||
}
|
||||
}
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabel;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
|
|
Loading…
Reference in New Issue