YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the only modified Node Label Mappings in the request. (Naganarasimha G R via wangda)
(cherry picked from commit 29a582ada0
)
This commit is contained in:
parent
96fbe42bd7
commit
5453a63612
|
@ -437,6 +437,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels.
|
YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels.
|
||||||
(Bibin A Chundatt via wangda)
|
(Bibin A Chundatt via wangda)
|
||||||
|
|
||||||
|
YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the
|
||||||
|
only modified Node Label Mappings in the request. (Naganarasimha G R via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -163,13 +163,23 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
|
||||||
// get nodesCollection before edition
|
Map<NodeId, Set<String>> effectiveModifiedLabelMappings =
|
||||||
Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet());
|
getModifiedNodeLabelsMappings(replaceLabelsToNode);
|
||||||
|
|
||||||
super.replaceLabelsOnNode(replaceLabelsToNode);
|
if(effectiveModifiedLabelMappings.isEmpty()) {
|
||||||
|
LOG.info("No Modified Node label Mapping to replace");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get nodesCollection before edition
|
||||||
|
Map<String, Host> before =
|
||||||
|
cloneNodeMap(effectiveModifiedLabelMappings.keySet());
|
||||||
|
|
||||||
|
super.replaceLabelsOnNode(effectiveModifiedLabelMappings);
|
||||||
|
|
||||||
// get nodesCollection after edition
|
// get nodesCollection after edition
|
||||||
Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet());
|
Map<String, Host> after =
|
||||||
|
cloneNodeMap(effectiveModifiedLabelMappings.keySet());
|
||||||
|
|
||||||
// update running nodes resources
|
// update running nodes resources
|
||||||
updateResourceMappings(before, after);
|
updateResourceMappings(before, after);
|
||||||
|
@ -178,6 +188,32 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<NodeId, Set<String>> getModifiedNodeLabelsMappings(
|
||||||
|
Map<NodeId, Set<String>> replaceLabelsToNode) {
|
||||||
|
Map<NodeId, Set<String>> effectiveModifiedLabels = new HashMap<>();
|
||||||
|
for (Entry<NodeId, Set<String>> nodeLabelMappingEntry : replaceLabelsToNode
|
||||||
|
.entrySet()) {
|
||||||
|
NodeId nodeId = nodeLabelMappingEntry.getKey();
|
||||||
|
Set<String> modifiedNodeLabels = nodeLabelMappingEntry.getValue();
|
||||||
|
Set<String> labelsBeforeModification = null;
|
||||||
|
Host host = nodeCollections.get(nodeId.getHost());
|
||||||
|
if (host == null) {
|
||||||
|
effectiveModifiedLabels.put(nodeId, modifiedNodeLabels);
|
||||||
|
continue;
|
||||||
|
} else if (nodeId.getPort() == WILDCARD_PORT) {
|
||||||
|
labelsBeforeModification = host.labels;
|
||||||
|
} else if (host.nms.get(nodeId) != null) {
|
||||||
|
labelsBeforeModification = host.nms.get(nodeId).labels;
|
||||||
|
}
|
||||||
|
if (labelsBeforeModification == null
|
||||||
|
|| labelsBeforeModification.size() != modifiedNodeLabels.size()
|
||||||
|
|| !labelsBeforeModification.containsAll(modifiedNodeLabels)) {
|
||||||
|
effectiveModifiedLabels.put(nodeId, modifiedNodeLabels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return effectiveModifiedLabels;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Following methods are used for setting if a node is up and running, and it
|
* Following methods are used for setting if a node is up and running, and it
|
||||||
* will update running nodes resource
|
* will update running nodes resource
|
||||||
|
|
|
@ -100,9 +100,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
@ -1042,12 +1042,6 @@ public class CapacityScheduler extends
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// labels is same, we don't need do update
|
|
||||||
if (node.getLabels().size() == newLabels.size()
|
|
||||||
&& node.getLabels().containsAll(newLabels)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get new partition, we have only one partition per node
|
// Get new partition, we have only one partition per node
|
||||||
String newPartition;
|
String newPartition;
|
||||||
if (newLabels.isEmpty()) {
|
if (newLabels.isEmpty()) {
|
||||||
|
|
|
@ -17,6 +17,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
|
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -31,10 +36,17 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
|
||||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -434,7 +446,88 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
||||||
Assert.fail("IOException from removeLabelsFromNode " + e);
|
Assert.fail("IOException from removeLabelsFromNode " + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class SchedulerEventHandler
|
||||||
|
implements EventHandler<SchedulerEvent> {
|
||||||
|
Map<NodeId, Set<String>> updatedNodeToLabels = new HashMap<>();
|
||||||
|
boolean receivedEvent;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(SchedulerEvent event) {
|
||||||
|
switch (event.getType()) {
|
||||||
|
case NODE_LABELS_UPDATE:
|
||||||
|
receivedEvent = true;
|
||||||
|
updatedNodeToLabels =
|
||||||
|
((NodeLabelsUpdateSchedulerEvent) event).getUpdatedNodeToLabels();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplaceLabelsFromNode() throws Exception {
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
Dispatcher syncDispatcher = new InlineDispatcher();
|
||||||
|
SchedulerEventHandler schedEventsHandler = new SchedulerEventHandler();
|
||||||
|
syncDispatcher.register(SchedulerEventType.class, schedEventsHandler);
|
||||||
|
when(rmContext.getDispatcher()).thenReturn(syncDispatcher);
|
||||||
|
mgr.setRMContext(rmContext);
|
||||||
|
|
||||||
|
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||||
|
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
|
||||||
|
mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE);
|
||||||
|
mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE);
|
||||||
|
|
||||||
|
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1"),
|
||||||
|
toNodeId("n2:1"), toSet("p2"), toNodeId("n3"), toSet("p3")));
|
||||||
|
assertTrue("Event should be sent when there is change in labels",
|
||||||
|
schedEventsHandler.receivedEvent);
|
||||||
|
assertEquals("3 node label mapping modified", 3,
|
||||||
|
schedEventsHandler.updatedNodeToLabels.size());
|
||||||
|
ImmutableMap<NodeId, Set<String>> modifiedMap =
|
||||||
|
ImmutableMap.of(toNodeId("n1:1"), toSet("p1"), toNodeId("n2:1"),
|
||||||
|
toSet("p2"), toNodeId("n3:1"), toSet("p3"));
|
||||||
|
assertEquals("Node label mapping is not matching", modifiedMap,
|
||||||
|
schedEventsHandler.updatedNodeToLabels);
|
||||||
|
schedEventsHandler.receivedEvent = false;
|
||||||
|
|
||||||
|
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1")));
|
||||||
|
assertFalse("No event should be sent when there is no change in labels",
|
||||||
|
schedEventsHandler.receivedEvent);
|
||||||
|
schedEventsHandler.receivedEvent = false;
|
||||||
|
|
||||||
|
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2:1"), toSet("p1"),
|
||||||
|
toNodeId("n3"), toSet("p3")));
|
||||||
|
assertTrue("Event should be sent when there is change in labels",
|
||||||
|
schedEventsHandler.receivedEvent);
|
||||||
|
assertEquals("Single node label mapping modified", 1,
|
||||||
|
schedEventsHandler.updatedNodeToLabels.size());
|
||||||
|
assertCollectionEquals(toSet("p1"),
|
||||||
|
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n2:1")));
|
||||||
|
schedEventsHandler.receivedEvent = false;
|
||||||
|
|
||||||
|
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p2")));
|
||||||
|
assertTrue("Event should be sent when there is change in labels @ HOST",
|
||||||
|
schedEventsHandler.receivedEvent);
|
||||||
|
assertEquals("Single node label mapping modified", 1,
|
||||||
|
schedEventsHandler.updatedNodeToLabels.size());
|
||||||
|
assertCollectionEquals(toSet("p2"),
|
||||||
|
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n3:1")));
|
||||||
|
schedEventsHandler.receivedEvent = false;
|
||||||
|
|
||||||
|
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
|
||||||
|
assertTrue(
|
||||||
|
"Event should be sent when labels are modified at host though labels were set @ NM level",
|
||||||
|
schedEventsHandler.receivedEvent);
|
||||||
|
assertEquals("Single node label mapping modified", 1,
|
||||||
|
schedEventsHandler.updatedNodeToLabels.size());
|
||||||
|
assertCollectionEquals(toSet("p2"),
|
||||||
|
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n1:1")));
|
||||||
|
schedEventsHandler.receivedEvent = false;
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception {
|
public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception {
|
||||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||||
|
|
Loading…
Reference in New Issue