YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
after NM is reconnected. Contributed by zhihai xu
(cherry picked from commit 5b5bb8dcdc
)
Conflicts:
hadoop-yarn-project/CHANGES.txt
This commit is contained in:
parent
4770f190b8
commit
1484ebb602
|
@ -36,6 +36,9 @@ Release 2.6.2 - UNRELEASED
|
|||
YARN-3780. Should use equals when compare Resource in RMNodeImpl#ReconnectNodeTransition.
|
||||
(zhihai xu via devaraj)
|
||||
|
||||
YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
|
||||
after NM is reconnected. (zhihai xu via xgong)
|
||||
|
||||
Release 2.6.1 - 2015-09-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -569,10 +569,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
if (!rmNode.getTotalCapability().equals(
|
||||
newNode.getTotalCapability())) {
|
||||
rmNode.totalCapability = newNode.getTotalCapability();
|
||||
}
|
||||
if (rmNode.getState().equals(NodeState.RUNNING)) {
|
||||
// Only add new node if old state is RUNNING
|
||||
// Only add old node if old state is RUNNING
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(newNode));
|
||||
new NodeAddedSchedulerEvent(rmNode));
|
||||
}
|
||||
} else {
|
||||
// Reconnected node differs, so replace old node and start new node
|
||||
|
|
|
@ -25,6 +25,9 @@ import org.junit.Assert;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
|
@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
|
@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDi
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -51,6 +58,8 @@ public class TestNMReconnect {
|
|||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
|
||||
private Dispatcher dispatcher;
|
||||
private RMContextImpl context;
|
||||
|
||||
private class TestRMNodeEventDispatcher implements
|
||||
EventHandler<RMNodeEvent> {
|
||||
|
@ -68,12 +77,12 @@ public class TestNMReconnect {
|
|||
public void setUp() {
|
||||
Configuration conf = new Configuration();
|
||||
// Dispatcher that processes events inline
|
||||
Dispatcher dispatcher = new InlineDispatcher();
|
||||
dispatcher = new InlineDispatcher();
|
||||
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new TestRMNodeEventDispatcher());
|
||||
|
||||
RMContext context = new RMContextImpl(dispatcher, null,
|
||||
context = new RMContextImpl(dispatcher, null,
|
||||
null, null, null, null, null, null, null, null);
|
||||
dispatcher.register(SchedulerEventType.class,
|
||||
new InlineDispatcher.EmptyEventHandler());
|
||||
|
@ -99,6 +108,11 @@ public class TestNMReconnect {
|
|||
resourceTrackerService.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
resourceTrackerService.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconnect() throws Exception {
|
||||
String hostname1 = "localhost1";
|
||||
|
@ -126,4 +140,53 @@ public class TestNMReconnect {
|
|||
Assert.assertEquals(RMNodeEventType.RECONNECTED,
|
||||
rmNodeEvents.get(0).getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompareRMNodeAfterReconnect() throws Exception {
|
||||
Configuration yarnConf = new YarnConfiguration();
|
||||
CapacityScheduler scheduler = new CapacityScheduler();
|
||||
scheduler.setConf(yarnConf);
|
||||
ConfigurationProvider configurationProvider =
|
||||
ConfigurationProviderFactory.getConfigurationProvider(yarnConf);
|
||||
configurationProvider.init(yarnConf);
|
||||
context.setConfigurationProvider(configurationProvider);
|
||||
RMNodeLabelsManager nlm = new RMNodeLabelsManager();
|
||||
nlm.init(yarnConf);
|
||||
nlm.start();
|
||||
context.setNodeLabelManager(nlm);
|
||||
scheduler.setRMContext(context);
|
||||
scheduler.init(yarnConf);
|
||||
scheduler.start();
|
||||
dispatcher.register(SchedulerEventType.class, scheduler);
|
||||
|
||||
String hostname1 = "localhost1";
|
||||
Resource capability = BuilderUtils.newResource(4096, 4);
|
||||
|
||||
RegisterNodeManagerRequest request1 = recordFactory
|
||||
.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
|
||||
request1.setNodeId(nodeId1);
|
||||
request1.setHttpPort(0);
|
||||
request1.setResource(capability);
|
||||
resourceTrackerService.registerNodeManager(request1);
|
||||
Assert.assertNotNull(context.getRMNodes().get(nodeId1));
|
||||
// verify Scheduler and RMContext use same RMNode reference.
|
||||
Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
|
||||
context.getRMNodes().get(nodeId1));
|
||||
Assert.assertEquals(context.getRMNodes().get(nodeId1).
|
||||
getTotalCapability(), capability);
|
||||
Resource capability1 = BuilderUtils.newResource(2048, 2);
|
||||
request1.setResource(capability1);
|
||||
resourceTrackerService.registerNodeManager(request1);
|
||||
Assert.assertNotNull(context.getRMNodes().get(nodeId1));
|
||||
// verify Scheduler and RMContext use same RMNode reference
|
||||
// after reconnect.
|
||||
Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
|
||||
context.getRMNodes().get(nodeId1));
|
||||
// verify RMNode's capability is changed.
|
||||
Assert.assertEquals(context.getRMNodes().get(nodeId1).
|
||||
getTotalCapability(), capability1);
|
||||
nlm.stop();
|
||||
scheduler.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue