From 29a582ada0fe195989eca25e5a995895e178f4ea Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 6 Oct 2015 11:55:51 -0700 Subject: [PATCH] YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the only modified Node Label Mappings in the request. (Naganarasimha G R via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../nodelabels/RMNodeLabelsManager.java | 44 ++++++++- .../scheduler/capacity/CapacityScheduler.java | 8 +- .../nodelabels/TestRMNodeLabelsManager.java | 97 ++++++++++++++++++- 4 files changed, 139 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2ff2f5c77e0..9d12b827f39 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -489,6 +489,9 @@ Release 2.8.0 - UNRELEASED YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels. (Bibin A Chundatt via wangda) + YARN-4215. RMNodeLabels Manager Need to verify and replace node labels for the + only modified Node Label Mappings in the request. (Naganarasimha G R via wangda) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 8587bdaef4b..62922adf74e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -163,13 +163,23 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) try { writeLock.lock(); - // get nodesCollection before edition - Map before = cloneNodeMap(replaceLabelsToNode.keySet()); + Map> effectiveModifiedLabelMappings = + getModifiedNodeLabelsMappings(replaceLabelsToNode); - super.replaceLabelsOnNode(replaceLabelsToNode); + if(effectiveModifiedLabelMappings.isEmpty()) { + LOG.info("No Modified Node label Mapping to replace"); + return; + } + + // get nodesCollection before edition + Map before = + cloneNodeMap(effectiveModifiedLabelMappings.keySet()); + + super.replaceLabelsOnNode(effectiveModifiedLabelMappings); // get nodesCollection after edition - Map after = cloneNodeMap(replaceLabelsToNode.keySet()); + Map after = + cloneNodeMap(effectiveModifiedLabelMappings.keySet()); // update running nodes resources updateResourceMappings(before, after); @@ -178,6 +188,32 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) } } + private Map> getModifiedNodeLabelsMappings( + Map> replaceLabelsToNode) { + Map> effectiveModifiedLabels = new HashMap<>(); + for (Entry> nodeLabelMappingEntry : replaceLabelsToNode + .entrySet()) { + NodeId nodeId = nodeLabelMappingEntry.getKey(); + Set modifiedNodeLabels = nodeLabelMappingEntry.getValue(); + Set labelsBeforeModification = null; + Host host = nodeCollections.get(nodeId.getHost()); + if (host == null) { + effectiveModifiedLabels.put(nodeId, modifiedNodeLabels); + continue; + } else if (nodeId.getPort() == WILDCARD_PORT) { + labelsBeforeModification = host.labels; + } else if (host.nms.get(nodeId) != null) { + labelsBeforeModification = host.nms.get(nodeId).labels; + } + if (labelsBeforeModification == null + || labelsBeforeModification.size() != modifiedNodeLabels.size() + || !labelsBeforeModification.containsAll(modifiedNodeLabels)) { + effectiveModifiedLabels.put(nodeId, modifiedNodeLabels); + } + } + return effectiveModifiedLabels; + } + /* * Following methods are used for setting if a node is up and running, and it * will update running nodes resource diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0fd20f876dd..ee31bea5ec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -100,9 +100,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -1042,12 +1042,6 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, return; } - // labels is same, we don't need do update - if (node.getLabels().size() == newLabels.size() - && node.getLabels().containsAll(newLabels)) { - return; - } - // Get new partition, we have only one partition per node String newPartition; if (newLabels.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java index 79408655e56..47e48305836 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -31,10 +36,17 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -434,7 +446,88 @@ public void testRemoveLabelsFromNode() throws Exception { Assert.fail("IOException from removeLabelsFromNode " + e); } } - + + private static class SchedulerEventHandler + implements EventHandler { + Map> updatedNodeToLabels = new HashMap<>(); + boolean receivedEvent; + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_LABELS_UPDATE: + receivedEvent = true; + updatedNodeToLabels = + ((NodeLabelsUpdateSchedulerEvent) event).getUpdatedNodeToLabels(); + break; + default: + break; + } + } + } + + @Test + public void testReplaceLabelsFromNode() throws Exception { + RMContext rmContext = mock(RMContext.class); + Dispatcher syncDispatcher = new InlineDispatcher(); + SchedulerEventHandler schedEventsHandler = new SchedulerEventHandler(); + syncDispatcher.register(SchedulerEventType.class, schedEventsHandler); + when(rmContext.getDispatcher()).thenReturn(syncDispatcher); + mgr.setRMContext(rmContext); + + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3")); + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE); + + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1"), + toNodeId("n2:1"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + assertTrue("Event should be sent when there is change in labels", + schedEventsHandler.receivedEvent); + assertEquals("3 node label mapping modified", 3, + schedEventsHandler.updatedNodeToLabels.size()); + ImmutableMap> modifiedMap = + ImmutableMap.of(toNodeId("n1:1"), toSet("p1"), toNodeId("n2:1"), + toSet("p2"), toNodeId("n3:1"), toSet("p3")); + assertEquals("Node label mapping is not matching", modifiedMap, + schedEventsHandler.updatedNodeToLabels); + schedEventsHandler.receivedEvent = false; + + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p1"))); + assertFalse("No event should be sent when there is no change in labels", + schedEventsHandler.receivedEvent); + schedEventsHandler.receivedEvent = false; + + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2:1"), toSet("p1"), + toNodeId("n3"), toSet("p3"))); + assertTrue("Event should be sent when there is change in labels", + schedEventsHandler.receivedEvent); + assertEquals("Single node label mapping modified", 1, + schedEventsHandler.updatedNodeToLabels.size()); + assertCollectionEquals(toSet("p1"), + schedEventsHandler.updatedNodeToLabels.get(toNodeId("n2:1"))); + schedEventsHandler.receivedEvent = false; + + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p2"))); + assertTrue("Event should be sent when there is change in labels @ HOST", + schedEventsHandler.receivedEvent); + assertEquals("Single node label mapping modified", 1, + schedEventsHandler.updatedNodeToLabels.size()); + assertCollectionEquals(toSet("p2"), + schedEventsHandler.updatedNodeToLabels.get(toNodeId("n3:1"))); + schedEventsHandler.receivedEvent = false; + + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); + assertTrue( + "Event should be sent when labels are modified at host though labels were set @ NM level", + schedEventsHandler.receivedEvent); + assertEquals("Single node label mapping modified", 1, + schedEventsHandler.updatedNodeToLabels.size()); + assertCollectionEquals(toSet("p2"), + schedEventsHandler.updatedNodeToLabels.get(toNodeId("n1:1"))); + schedEventsHandler.receivedEvent = false; + } + @Test(timeout = 5000) public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception { mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3"));