YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the only modified Node Label Mappings in the request. (Naganarasimha G R via wangda)
This commit is contained in:
parent
a8b4d0ff28
commit
29a582ada0
|
@ -489,6 +489,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels.
|
||||
(Bibin A Chundatt via wangda)
|
||||
|
||||
YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the
|
||||
only modified Node Label Mappings in the request. (Naganarasimha G R via wangda)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -163,13 +163,23 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
|||
try {
|
||||
writeLock.lock();
|
||||
|
||||
// get nodesCollection before edition
|
||||
Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet());
|
||||
Map<NodeId, Set<String>> effectiveModifiedLabelMappings =
|
||||
getModifiedNodeLabelsMappings(replaceLabelsToNode);
|
||||
|
||||
super.replaceLabelsOnNode(replaceLabelsToNode);
|
||||
if(effectiveModifiedLabelMappings.isEmpty()) {
|
||||
LOG.info("No Modified Node label Mapping to replace");
|
||||
return;
|
||||
}
|
||||
|
||||
// get nodesCollection before edition
|
||||
Map<String, Host> before =
|
||||
cloneNodeMap(effectiveModifiedLabelMappings.keySet());
|
||||
|
||||
super.replaceLabelsOnNode(effectiveModifiedLabelMappings);
|
||||
|
||||
// get nodesCollection after edition
|
||||
Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet());
|
||||
Map<String, Host> after =
|
||||
cloneNodeMap(effectiveModifiedLabelMappings.keySet());
|
||||
|
||||
// update running nodes resources
|
||||
updateResourceMappings(before, after);
|
||||
|
@ -178,6 +188,32 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<NodeId, Set<String>> getModifiedNodeLabelsMappings(
|
||||
Map<NodeId, Set<String>> replaceLabelsToNode) {
|
||||
Map<NodeId, Set<String>> effectiveModifiedLabels = new HashMap<>();
|
||||
for (Entry<NodeId, Set<String>> nodeLabelMappingEntry : replaceLabelsToNode
|
||||
.entrySet()) {
|
||||
NodeId nodeId = nodeLabelMappingEntry.getKey();
|
||||
Set<String> modifiedNodeLabels = nodeLabelMappingEntry.getValue();
|
||||
Set<String> labelsBeforeModification = null;
|
||||
Host host = nodeCollections.get(nodeId.getHost());
|
||||
if (host == null) {
|
||||
effectiveModifiedLabels.put(nodeId, modifiedNodeLabels);
|
||||
continue;
|
||||
} else if (nodeId.getPort() == WILDCARD_PORT) {
|
||||
labelsBeforeModification = host.labels;
|
||||
} else if (host.nms.get(nodeId) != null) {
|
||||
labelsBeforeModification = host.nms.get(nodeId).labels;
|
||||
}
|
||||
if (labelsBeforeModification == null
|
||||
|| labelsBeforeModification.size() != modifiedNodeLabels.size()
|
||||
|| !labelsBeforeModification.containsAll(modifiedNodeLabels)) {
|
||||
effectiveModifiedLabels.put(nodeId, modifiedNodeLabels);
|
||||
}
|
||||
}
|
||||
return effectiveModifiedLabels;
|
||||
}
|
||||
|
||||
/*
|
||||
* Following methods are used for setting if a node is up and running, and it
|
||||
* will update running nodes resource
|
||||
|
|
|
@ -100,9 +100,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
|
@ -1042,12 +1042,6 @@ public class CapacityScheduler extends
|
|||
return;
|
||||
}
|
||||
|
||||
// labels is same, we don't need do update
|
||||
if (node.getLabels().size() == newLabels.size()
|
||||
&& node.getLabels().containsAll(newLabels)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get new partition, we have only one partition per node
|
||||
String newPartition;
|
||||
if (newLabels.isEmpty()) {
|
||||
|
|
|
@ -17,6 +17,11 @@
|
|||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -31,10 +36,17 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -435,6 +447,87 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class SchedulerEventHandler
|
||||
implements EventHandler<SchedulerEvent> {
|
||||
Map<NodeId, Set<String>> updatedNodeToLabels = new HashMap<>();
|
||||
boolean receivedEvent;
|
||||
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
switch (event.getType()) {
|
||||
case NODE_LABELS_UPDATE:
|
||||
receivedEvent = true;
|
||||
updatedNodeToLabels =
|
||||
((NodeLabelsUpdateSchedulerEvent) event).getUpdatedNodeToLabels();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplaceLabelsFromNode() throws Exception {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
Dispatcher syncDispatcher = new InlineDispatcher();
|
||||
SchedulerEventHandler schedEventsHandler = new SchedulerEventHandler();
|
||||
syncDispatcher.register(SchedulerEventType.class, schedEventsHandler);
|
||||
when(rmContext.getDispatcher()).thenReturn(syncDispatcher);
|
||||
mgr.setRMContext(rmContext);
|
||||
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
|
||||
mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE);
|
||||
mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE);
|
||||
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1"),
|
||||
toNodeId("n2:1"), toSet("p2"), toNodeId("n3"), toSet("p3")));
|
||||
assertTrue("Event should be sent when there is change in labels",
|
||||
schedEventsHandler.receivedEvent);
|
||||
assertEquals("3 node label mapping modified", 3,
|
||||
schedEventsHandler.updatedNodeToLabels.size());
|
||||
ImmutableMap<NodeId, Set<String>> modifiedMap =
|
||||
ImmutableMap.of(toNodeId("n1:1"), toSet("p1"), toNodeId("n2:1"),
|
||||
toSet("p2"), toNodeId("n3:1"), toSet("p3"));
|
||||
assertEquals("Node label mapping is not matching", modifiedMap,
|
||||
schedEventsHandler.updatedNodeToLabels);
|
||||
schedEventsHandler.receivedEvent = false;
|
||||
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1")));
|
||||
assertFalse("No event should be sent when there is no change in labels",
|
||||
schedEventsHandler.receivedEvent);
|
||||
schedEventsHandler.receivedEvent = false;
|
||||
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2:1"), toSet("p1"),
|
||||
toNodeId("n3"), toSet("p3")));
|
||||
assertTrue("Event should be sent when there is change in labels",
|
||||
schedEventsHandler.receivedEvent);
|
||||
assertEquals("Single node label mapping modified", 1,
|
||||
schedEventsHandler.updatedNodeToLabels.size());
|
||||
assertCollectionEquals(toSet("p1"),
|
||||
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n2:1")));
|
||||
schedEventsHandler.receivedEvent = false;
|
||||
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p2")));
|
||||
assertTrue("Event should be sent when there is change in labels @ HOST",
|
||||
schedEventsHandler.receivedEvent);
|
||||
assertEquals("Single node label mapping modified", 1,
|
||||
schedEventsHandler.updatedNodeToLabels.size());
|
||||
assertCollectionEquals(toSet("p2"),
|
||||
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n3:1")));
|
||||
schedEventsHandler.receivedEvent = false;
|
||||
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
|
||||
assertTrue(
|
||||
"Event should be sent when labels are modified at host though labels were set @ NM level",
|
||||
schedEventsHandler.receivedEvent);
|
||||
assertEquals("Single node label mapping modified", 1,
|
||||
schedEventsHandler.updatedNodeToLabels.size());
|
||||
assertCollectionEquals(toSet("p2"),
|
||||
schedEventsHandler.updatedNodeToLabels.get(toNodeId("n1:1")));
|
||||
schedEventsHandler.receivedEvent = false;
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception {
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));
|
||||
|
|
Loading…
Reference in New Issue