YARN-4939. The decommissioning Node should keep alive during NM restart. Contributed by sandflee.

(cherry picked from commit 30ee57ceb1)
This commit is contained in:
Junping Du 2016-07-08 04:14:53 -07:00
parent 687185feb2
commit 5c0386d535
3 changed files with 37 additions and 10 deletions

View File

@ -74,6 +74,7 @@ public static List<RMNode> queryRMNodes(RMContext context,
ArrayList<RMNode> results = new ArrayList<RMNode>(); ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) || if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) || acceptedStates.contains(NodeState.RUNNING) ||
acceptedStates.contains(NodeState.DECOMMISSIONING) ||
acceptedStates.contains(NodeState.UNHEALTHY)) { acceptedStates.contains(NodeState.UNHEALTHY)) {
for (RMNode rmNode : context.getRMNodes().values()) { for (RMNode rmNode : context.getRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) { if (acceptedStates.contains(rmNode.getState())) {

View File

@ -336,7 +336,8 @@ public RegisterNodeManagerResponse registerNodeManager(
} }
// Check if this node is a 'valid' node // Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) { if (!this.nodesListManager.isValidNode(host) &&
!isNodeInDecommissioning(nodeId)) {
String message = String message =
"Disallowed NodeManager from " + host "Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager."; + ", Sending SHUTDOWN signal to the NodeManager.";

View File

@ -20,41 +20,40 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -195,6 +194,32 @@ public void testCompareRMNodeAfterReconnect() throws Exception {
scheduler.stop(); scheduler.stop();
} }
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testDecommissioningNodeReconnect()
throws Exception {
MockRM rm = new MockRM();
rm.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.getRMContext().getNodesListManager().getHostsReader().
getExcludedHosts().add("127.0.0.1");
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
MockNM nm2 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
RegisterNodeManagerResponse response = nm2.registerNode();
// not SHUTDOWN
Assert.assertTrue(response.getNodeAction().equals(NodeAction.NORMAL));
rm.stop();
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception { public void testRMNodeStatusAfterReconnect() throws Exception {
// The node(127.0.0.1:1234) reconnected with RM. When it registered with // The node(127.0.0.1:1234) reconnected with RM. When it registered with