From 1484ebb60248f79f7286682f8813c6de095366f2 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 8 Oct 2015 16:01:20 +0000 Subject: [PATCH] YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes after NM is reconnected. Contributed by zhihai xu (cherry picked from commit 5b5bb8dcdc888ba1ebc7e4eba0fa0e7e79edda9a) Conflicts: hadoop-yarn-project/CHANGES.txt --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmnode/RMNodeImpl.java | 8 ++- .../resourcetracker/TestNMReconnect.java | 67 ++++++++++++++++++- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 34550fbeff3..3a2ab198718 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -36,6 +36,9 @@ Release 2.6.2 - UNRELEASED YARN-3780. Should use equals when compare Resource in RMNodeImpl#ReconnectNodeTransition. (zhihai xu via devaraj) + YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes + after NM is reconnected. (zhihai xu via xgong) + Release 2.6.1 - 2015-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b898b98ad74..6ddfaf8efa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -569,10 +569,14 @@ public class RMNodeImpl implements RMNode, EventHandler { if (rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + if (!rmNode.getTotalCapability().equals( + newNode.getTotalCapability())) { + rmNode.totalCapability = newNode.getTotalCapability(); + } if (rmNode.getState().equals(NodeState.RUNNING)) { - // Only add new node if old state is RUNNING + // Only add old node if old state is RUNNING rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(newNode)); + new NodeAddedSchedulerEvent(rmNode)); } } else { // Reconnected node differs, so replace old node and start new node diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index d16d5510365..b525efced6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -25,6 +25,9 @@ import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; @@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; @@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDi import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,6 +58,8 @@ public class TestNMReconnect { RecordFactoryProvider.getRecordFactory(null); private List rmNodeEvents = new ArrayList(); + private Dispatcher dispatcher; + private RMContextImpl context; private class TestRMNodeEventDispatcher implements EventHandler { @@ -68,12 +77,12 @@ public class TestNMReconnect { public void setUp() { Configuration conf = new Configuration(); // Dispatcher that processes events inline - Dispatcher dispatcher = new InlineDispatcher(); + dispatcher = new InlineDispatcher(); dispatcher.register(RMNodeEventType.class, new TestRMNodeEventDispatcher()); - RMContext context = new RMContextImpl(dispatcher, null, + context = new RMContextImpl(dispatcher, null, null, null, null, null, null, null, null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); @@ -99,6 +108,11 @@ public class TestNMReconnect { resourceTrackerService.start(); } + @After + public void tearDown() { + resourceTrackerService.stop(); + } + @Test public void testReconnect() throws Exception { String hostname1 = "localhost1"; @@ -126,4 +140,53 @@ public class TestNMReconnect { Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvents.get(0).getType()); } + + @Test + public void testCompareRMNodeAfterReconnect() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + CapacityScheduler scheduler = new CapacityScheduler(); + scheduler.setConf(yarnConf); + ConfigurationProvider configurationProvider = + ConfigurationProviderFactory.getConfigurationProvider(yarnConf); + configurationProvider.init(yarnConf); + context.setConfigurationProvider(configurationProvider); + RMNodeLabelsManager nlm = new RMNodeLabelsManager(); + nlm.init(yarnConf); + nlm.start(); + context.setNodeLabelManager(nlm); + scheduler.setRMContext(context); + scheduler.init(yarnConf); + scheduler.start(); + dispatcher.register(SchedulerEventType.class, scheduler); + + String hostname1 = "localhost1"; + Resource capability = BuilderUtils.newResource(4096, 4); + + RegisterNodeManagerRequest request1 = recordFactory + .newRecordInstance(RegisterNodeManagerRequest.class); + NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + request1.setNodeId(nodeId1); + request1.setHttpPort(0); + request1.setResource(capability); + resourceTrackerService.registerNodeManager(request1); + Assert.assertNotNull(context.getRMNodes().get(nodeId1)); + // verify Scheduler and RMContext use same RMNode reference. + Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() == + context.getRMNodes().get(nodeId1)); + Assert.assertEquals(context.getRMNodes().get(nodeId1). + getTotalCapability(), capability); + Resource capability1 = BuilderUtils.newResource(2048, 2); + request1.setResource(capability1); + resourceTrackerService.registerNodeManager(request1); + Assert.assertNotNull(context.getRMNodes().get(nodeId1)); + // verify Scheduler and RMContext use same RMNode reference + // after reconnect. + Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() == + context.getRMNodes().get(nodeId1)); + // verify RMNode's capability is changed. + Assert.assertEquals(context.getRMNodes().get(nodeId1). + getTotalCapability(), capability1); + nlm.stop(); + scheduler.stop(); + } }