YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
after NM is reconnected. Contributed by zhihai xu
(cherry picked from commit 5b5bb8dcdc
)
Conflicts:
hadoop-yarn-project/CHANGES.txt
This commit is contained in:
parent
aaff9dc39b
commit
784d00e0e2
|
@ -103,6 +103,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3780. Should use equals when compare Resource in RMNodeImpl#ReconnectNodeTransition.
|
YARN-3780. Should use equals when compare Resource in RMNodeImpl#ReconnectNodeTransition.
|
||||||
(zhihai xu via devaraj)
|
(zhihai xu via devaraj)
|
||||||
|
|
||||||
|
YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
|
||||||
|
after NM is reconnected. (zhihai xu via xgong)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -575,10 +575,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
|
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||||
// Reset heartbeat ID since node just restarted.
|
// Reset heartbeat ID since node just restarted.
|
||||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||||
|
if (!rmNode.getTotalCapability().equals(
|
||||||
|
newNode.getTotalCapability())) {
|
||||||
|
rmNode.totalCapability = newNode.getTotalCapability();
|
||||||
|
}
|
||||||
if (rmNode.getState().equals(NodeState.RUNNING)) {
|
if (rmNode.getState().equals(NodeState.RUNNING)) {
|
||||||
// Only add new node if old state is RUNNING
|
// Only add old node if old state is RUNNING
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodeAddedSchedulerEvent(newNode));
|
new NodeAddedSchedulerEvent(rmNode));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Reconnected node differs, so replace old node and start new node
|
// Reconnected node differs, so replace old node and start new node
|
||||||
|
|
|
@ -25,6 +25,9 @@ import org.junit.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
|
@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDi
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -51,6 +58,8 @@ public class TestNMReconnect {
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
|
private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
|
||||||
|
private Dispatcher dispatcher;
|
||||||
|
private RMContextImpl context;
|
||||||
|
|
||||||
private class TestRMNodeEventDispatcher implements
|
private class TestRMNodeEventDispatcher implements
|
||||||
EventHandler<RMNodeEvent> {
|
EventHandler<RMNodeEvent> {
|
||||||
|
@ -68,12 +77,12 @@ public class TestNMReconnect {
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// Dispatcher that processes events inline
|
// Dispatcher that processes events inline
|
||||||
Dispatcher dispatcher = new InlineDispatcher();
|
dispatcher = new InlineDispatcher();
|
||||||
|
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
new TestRMNodeEventDispatcher());
|
new TestRMNodeEventDispatcher());
|
||||||
|
|
||||||
RMContext context = new RMContextImpl(dispatcher, null,
|
context = new RMContextImpl(dispatcher, null,
|
||||||
null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null);
|
||||||
dispatcher.register(SchedulerEventType.class,
|
dispatcher.register(SchedulerEventType.class,
|
||||||
new InlineDispatcher.EmptyEventHandler());
|
new InlineDispatcher.EmptyEventHandler());
|
||||||
|
@ -99,6 +108,11 @@ public class TestNMReconnect {
|
||||||
resourceTrackerService.start();
|
resourceTrackerService.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
resourceTrackerService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReconnect() throws Exception {
|
public void testReconnect() throws Exception {
|
||||||
String hostname1 = "localhost1";
|
String hostname1 = "localhost1";
|
||||||
|
@ -126,4 +140,53 @@ public class TestNMReconnect {
|
||||||
Assert.assertEquals(RMNodeEventType.RECONNECTED,
|
Assert.assertEquals(RMNodeEventType.RECONNECTED,
|
||||||
rmNodeEvents.get(0).getType());
|
rmNodeEvents.get(0).getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompareRMNodeAfterReconnect() throws Exception {
|
||||||
|
Configuration yarnConf = new YarnConfiguration();
|
||||||
|
CapacityScheduler scheduler = new CapacityScheduler();
|
||||||
|
scheduler.setConf(yarnConf);
|
||||||
|
ConfigurationProvider configurationProvider =
|
||||||
|
ConfigurationProviderFactory.getConfigurationProvider(yarnConf);
|
||||||
|
configurationProvider.init(yarnConf);
|
||||||
|
context.setConfigurationProvider(configurationProvider);
|
||||||
|
RMNodeLabelsManager nlm = new RMNodeLabelsManager();
|
||||||
|
nlm.init(yarnConf);
|
||||||
|
nlm.start();
|
||||||
|
context.setNodeLabelManager(nlm);
|
||||||
|
scheduler.setRMContext(context);
|
||||||
|
scheduler.init(yarnConf);
|
||||||
|
scheduler.start();
|
||||||
|
dispatcher.register(SchedulerEventType.class, scheduler);
|
||||||
|
|
||||||
|
String hostname1 = "localhost1";
|
||||||
|
Resource capability = BuilderUtils.newResource(4096, 4);
|
||||||
|
|
||||||
|
RegisterNodeManagerRequest request1 = recordFactory
|
||||||
|
.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
|
||||||
|
request1.setNodeId(nodeId1);
|
||||||
|
request1.setHttpPort(0);
|
||||||
|
request1.setResource(capability);
|
||||||
|
resourceTrackerService.registerNodeManager(request1);
|
||||||
|
Assert.assertNotNull(context.getRMNodes().get(nodeId1));
|
||||||
|
// verify Scheduler and RMContext use same RMNode reference.
|
||||||
|
Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
|
||||||
|
context.getRMNodes().get(nodeId1));
|
||||||
|
Assert.assertEquals(context.getRMNodes().get(nodeId1).
|
||||||
|
getTotalCapability(), capability);
|
||||||
|
Resource capability1 = BuilderUtils.newResource(2048, 2);
|
||||||
|
request1.setResource(capability1);
|
||||||
|
resourceTrackerService.registerNodeManager(request1);
|
||||||
|
Assert.assertNotNull(context.getRMNodes().get(nodeId1));
|
||||||
|
// verify Scheduler and RMContext use same RMNode reference
|
||||||
|
// after reconnect.
|
||||||
|
Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
|
||||||
|
context.getRMNodes().get(nodeId1));
|
||||||
|
// verify RMNode's capability is changed.
|
||||||
|
Assert.assertEquals(context.getRMNodes().get(nodeId1).
|
||||||
|
getTotalCapability(), capability1);
|
||||||
|
nlm.stop();
|
||||||
|
scheduler.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue