YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail when using ephemeral ports on NodeIDs. Contributed by Wangda Tan.
(cherry picked from commit abae63caf9
)
This commit is contained in:
parent
d4606bff3c
commit
778fbcd5d3
|
@ -524,6 +524,9 @@ Release 2.6.0 - UNRELEASED
|
|||
tracking per label when a host runs multiple node-managers. (Wangda Tan via
|
||||
vinodkv)
|
||||
|
||||
YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail
|
||||
when using ephemeral ports on NodeIDs. (Wangda Tan via vinodkv)
|
||||
|
||||
BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
YARN-1707. Introduce APIs to add/remove/resize queues in the
|
||||
|
|
|
@ -299,14 +299,14 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Set<String> labels = entry.getValue();
|
||||
|
||||
createNodeIfNonExisted(entry.getKey());
|
||||
|
||||
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
host.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, host.labels);
|
||||
} else {
|
||||
createNodeIfNonExisted(nodeId);
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels == null) {
|
||||
nm.labels = new HashSet<String>();
|
||||
|
@ -534,21 +534,21 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void internalReplaceLabelsOnNode(
|
||||
Map<NodeId, Set<String>> replaceLabelsToNode) {
|
||||
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
|
||||
// do replace labels to nodes
|
||||
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
|
||||
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Set<String> labels = entry.getValue();
|
||||
|
||||
// update nodeCollections
|
||||
createNodeIfNonExisted(entry.getKey());
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
host.labels.clear();
|
||||
host.labels.addAll(labels);
|
||||
newNMToLabels.put(nodeId, host.labels);
|
||||
} else {
|
||||
createNodeIfNonExisted(nodeId);
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
if (nm.labels == null) {
|
||||
nm.labels = new HashSet<String>();
|
||||
|
@ -672,10 +672,6 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
|
||||
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
|
||||
boolean checkRunning) {
|
||||
if (WILDCARD_PORT == nodeId.getPort()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Host host = map.get(nodeId.getHost());
|
||||
if (null == host) {
|
||||
return null;
|
||||
|
@ -707,17 +703,22 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
protected void createNodeIfNonExisted(NodeId nodeId) {
|
||||
protected void createNodeIfNonExisted(NodeId nodeId) throws IOException {
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
if (null == host) {
|
||||
host = new Host();
|
||||
nodeCollections.put(nodeId.getHost(), host);
|
||||
throw new IOException("Should create host before creating node.");
|
||||
}
|
||||
if (nodeId.getPort() != WILDCARD_PORT) {
|
||||
Node nm = host.nms.get(nodeId);
|
||||
if (null == nm) {
|
||||
host.nms.put(nodeId, new Node());
|
||||
}
|
||||
Node nm = host.nms.get(nodeId);
|
||||
if (null == nm) {
|
||||
host.nms.put(nodeId, new Node());
|
||||
}
|
||||
}
|
||||
|
||||
protected void createHostIfNonExisted(String hostName) {
|
||||
Host host = nodeCollections.get(hostName);
|
||||
if (null == host) {
|
||||
host = new Host();
|
||||
nodeCollections.put(hostName, host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,7 +181,15 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
|||
// save if we have a node before
|
||||
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
|
||||
|
||||
createNodeIfNonExisted(nodeId);
|
||||
createHostIfNonExisted(nodeId.getHost());
|
||||
try {
|
||||
createNodeIfNonExisted(nodeId);
|
||||
} catch (IOException e) {
|
||||
LOG.error("This shouldn't happen, cannot get host in nodeCollection"
|
||||
+ " associated to the node being activated");
|
||||
return;
|
||||
}
|
||||
|
||||
Node nm = getNMInNodeSet(nodeId);
|
||||
nm.resource = resource;
|
||||
nm.running = true;
|
||||
|
@ -220,7 +228,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void updateNodeResource(NodeId node, Resource newResource) {
|
||||
public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
|
||||
deactivateNode(node);
|
||||
activateNode(node, newResource);
|
||||
}
|
||||
|
|
|
@ -96,6 +96,14 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
|||
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
|
||||
Resources.add(SMALL_RESOURCE, LARGE_NODE));
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testActivateNodeManagerWithZeroPort() throws Exception {
|
||||
// active two NM, one is zero port , another is non-zero port. no exception
|
||||
// should be raised
|
||||
mgr.activateNode(NodeId.newInstance("n1", 0), SMALL_RESOURCE);
|
||||
mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test(timeout = 5000)
|
||||
|
|
Loading…
Reference in New Issue