From 5ac97bbe25f051f1d956f4fd2b45325863a86a96 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 31 Oct 2013 02:57:33 +0000 Subject: [PATCH] YARN-1343. NodeManagers additions/restarts are not reported as node updates in AllocateResponse responses to AMs. (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1537369 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/NodesListManager.java | 19 ++- .../resourcemanager/rmnode/RMNodeImpl.java | 10 +- .../TestRMNodeTransitions.java | 77 ++++++++++- .../resourcetracker/TestNMReconnect.java | 124 ++++++++++++++++++ 5 files changed, 219 insertions(+), 14 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ba939b7faa9..952d2f36f73 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -156,6 +156,9 @@ Release 2.2.1 - UNRELEASED YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via cnauroth) + YARN-1343. NodeManagers additions/restarts are not reported as node updates + in AllocateResponse responses to AMs. (tucu) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index e9a043693e1..4249980c7d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -160,17 +160,14 @@ public class NodesListManager extends AbstractService implements if (unusableRMNodesConcurrentSet.contains(eventNode)) { LOG.debug(eventNode + " reported usable"); unusableRMNodesConcurrentSet.remove(eventNode); - for (RMApp app : rmContext.getRMApps().values()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_USABLE)); - } - } else { - LOG.warn(eventNode - + " reported usable without first reporting unusable"); + } + for (RMApp app : rmContext.getRMApps().values()) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + RMAppNodeUpdateType.NODE_USABLE)); } break; default: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 00a5cb4f209..74291005a44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -438,7 +438,10 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); - + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -471,7 +474,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // Only add new node if old state is not UNHEALTHY rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); - } + } } else { // Reconnected node differs, so replace old node and start new node switch (rmNode.getState()) { @@ -486,6 +489,9 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); } + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index c060bb603c8..479128a4f22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -79,6 +81,18 @@ public class TestRMNodeTransitions { } } + private NodesListManagerEvent nodesListManagerEvent = null; + + private class TestNodeListManagerEventDispatcher implements + EventHandler { + + @Override + public void handle(NodesListManagerEvent event) { + nodesListManagerEvent = event; + } + + } + @Before public void setUp() throws Exception { InlineDispatcher rmDispatcher = new InlineDispatcher(); @@ -109,8 +123,12 @@ public class TestRMNodeTransitions { rmDispatcher.register(SchedulerEventType.class, new TestSchedulerEventDispatcher()); + rmDispatcher.register(NodesListManagerEventType.class, + new TestNodeListManagerEventDispatcher()); + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + nodesListManagerEvent = null; } @@ -431,8 +449,9 @@ public class TestRMNodeTransitions { private RMNodeImpl getRunningNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, - null, null, null); + null, capability, null); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; @@ -447,4 +466,60 @@ public class TestRMNodeTransitions { Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } + + + private RMNodeImpl getNewNode() { + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + return node; + } + + @Test + public void testAdd() { + RMNodeImpl node = getNewNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testReconnect() { + RMNodeImpl node = getRunningNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node)); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java new file mode 100644 index 00000000000..cbb23740de4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Before; +import org.junit.Test; + +public class TestNMReconnect { + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + private RMNodeEvent rmNodeEvent = null; + + private class TestRMNodeEventDispatcher implements + EventHandler { + + @Override + public void handle(RMNodeEvent event) { + rmNodeEvent = event; + } + + } + + ResourceTrackerService resourceTrackerService; + + @Before + public void setUp() { + Configuration conf = new Configuration(); + // Dispatcher that processes events inline + Dispatcher dispatcher = new InlineDispatcher(); + + dispatcher.register(RMNodeEventType.class, + new TestRMNodeEventDispatcher()); + + RMContext context = new RMContextImpl(dispatcher, null, + null, null, null, null, null, null, null); + dispatcher.register(SchedulerEventType.class, + new InlineDispatcher.EmptyEventHandler()); + dispatcher.register(RMNodeEventType.class, + new NodeEventDispatcher(context)); + NMLivelinessMonitor nmLivelinessMonitor = new NMLivelinessMonitor( + dispatcher); + nmLivelinessMonitor.init(conf); + nmLivelinessMonitor.start(); + NodesListManager nodesListManager = new NodesListManager(context); + nodesListManager.init(conf); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.start(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.start(); + resourceTrackerService = new ResourceTrackerService(context, + nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, + nmTokenSecretManager); + + resourceTrackerService.init(conf); + resourceTrackerService.start(); + } + + @Test + public void testReconnect() throws Exception { + String hostname1 = "localhost1"; + Resource capability = BuilderUtils.newResource(1024, 1); + + RegisterNodeManagerRequest request1 = recordFactory + .newRecordInstance(RegisterNodeManagerRequest.class); + NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + request1.setNodeId(nodeId1); + request1.setHttpPort(0); + request1.setResource(capability); + resourceTrackerService.registerNodeManager(request1); + + Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvent.getType()); + + rmNodeEvent = null; + resourceTrackerService.registerNodeManager(request1); + Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType()); + + rmNodeEvent = null; + resourceTrackerService.registerNodeManager(request1); + capability = BuilderUtils.newResource(1024, 2); + request1.setResource(capability); + Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType()); + } +}