YARN-4454. NM to nodelabel mapping going wrong after RM restart. (Bibin A Chundatt via wangda)
(cherry picked from commit bc038b382c
)
This commit is contained in:
parent
12caea3274
commit
f16daa59a4
|
@ -1113,6 +1113,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4461. Redundant nodeLocalityDelay log in LeafQueue (Eric Payne via
|
YARN-4461. Redundant nodeLocalityDelay log in LeafQueue (Eric Payne via
|
||||||
jlowe)
|
jlowe)
|
||||||
|
|
||||||
|
YARN-4454. NM to nodelabel mapping going wrong after RM restart.
|
||||||
|
(Bibin A Chundatt via wangda)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -1074,7 +1075,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
|
|
||||||
protected Map<NodeId, Set<String>> normalizeNodeIdToLabels(
|
protected Map<NodeId, Set<String>> normalizeNodeIdToLabels(
|
||||||
Map<NodeId, Set<String>> nodeIdToLabels) {
|
Map<NodeId, Set<String>> nodeIdToLabels) {
|
||||||
Map<NodeId, Set<String>> newMap = new HashMap<NodeId, Set<String>>();
|
Map<NodeId, Set<String>> newMap = new TreeMap<NodeId, Set<String>>();
|
||||||
for (Entry<NodeId, Set<String>> entry : nodeIdToLabels.entrySet()) {
|
for (Entry<NodeId, Set<String>> entry : nodeIdToLabels.entrySet()) {
|
||||||
NodeId id = entry.getKey();
|
NodeId id = entry.getKey();
|
||||||
Set<String> labels = entry.getValue();
|
Set<String> labels = entry.getValue();
|
||||||
|
|
|
@ -2179,8 +2179,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
nodeLabelManager = rm2.getRMContext().getNodeLabelManager();
|
nodeLabelManager = rm2.getRMContext().getNodeLabelManager();
|
||||||
Assert.assertEquals(clusterNodeLabels.size(), nodeLabelManager
|
Assert.assertEquals(clusterNodeLabels.size(),
|
||||||
.getClusterNodeLabelNames().size());
|
nodeLabelManager.getClusterNodeLabelNames().size());
|
||||||
|
|
||||||
nodeLabels = nodeLabelManager.getNodeLabels();
|
nodeLabels = nodeLabelManager.getNodeLabels();
|
||||||
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
||||||
|
@ -2256,4 +2256,69 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testRMRestartNodeMapping() throws Exception {
|
||||||
|
// Initial FS node label store root dir to a random tmp dir
|
||||||
|
File nodeLabelFsStoreDir = new File("target",
|
||||||
|
this.getClass().getSimpleName() + "-testRMRestartNodeMapping");
|
||||||
|
if (nodeLabelFsStoreDir.exists()) {
|
||||||
|
FileUtils.deleteDirectory(nodeLabelFsStoreDir);
|
||||||
|
}
|
||||||
|
nodeLabelFsStoreDir.deleteOnExit();
|
||||||
|
String nodeLabelFsStoreDirURI = nodeLabelFsStoreDir.toURI().toString();
|
||||||
|
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||||
|
nodeLabelFsStoreDirURI);
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.init(conf);
|
||||||
|
rm1.start();
|
||||||
|
RMNodeLabelsManager nodeLabelManager =
|
||||||
|
rm1.getRMContext().getNodeLabelManager();
|
||||||
|
|
||||||
|
Set<String> clusterNodeLabels = new HashSet<String>();
|
||||||
|
clusterNodeLabels.add("x");
|
||||||
|
clusterNodeLabels.add("y");
|
||||||
|
nodeLabelManager
|
||||||
|
.addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);
|
||||||
|
// Add node Label to Node h1->x
|
||||||
|
NodeId n1 = NodeId.newInstance("h1", 1234);
|
||||||
|
NodeId n2 = NodeId.newInstance("h1", 1235);
|
||||||
|
NodeId nihost = NodeId.newInstance("h1", 0);
|
||||||
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x")));
|
||||||
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n2, toSet("x")));
|
||||||
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(nihost, toSet("y")));
|
||||||
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x")));
|
||||||
|
MockRM rm2 = null;
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm2.init(conf);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
nodeLabelManager = rm2.getRMContext().getNodeLabelManager();
|
||||||
|
Map<String, Set<NodeId>> labelsToNodes =
|
||||||
|
nodeLabelManager.getLabelsToNodes(toSet("x"));
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
null == labelsToNodes.get("x") ? 0 : labelsToNodes.get("x").size());
|
||||||
|
}
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue