YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee
(cherry picked from commit e1ccc9622b
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
This commit is contained in:
parent
3d8ae9c577
commit
1a99d5ab97
|
@ -888,7 +888,7 @@ public class FairScheduler extends
|
||||||
} else {
|
} else {
|
||||||
nodesPerRack.put(rackName, 1);
|
nodesPerRack.put(rackName, 1);
|
||||||
}
|
}
|
||||||
Resources.addTo(clusterResource, node.getTotalCapability());
|
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
|
||||||
updateMaximumAllocation(schedulerNode, true);
|
updateMaximumAllocation(schedulerNode, true);
|
||||||
|
|
||||||
triggerUpdate();
|
triggerUpdate();
|
||||||
|
@ -908,7 +908,7 @@ public class FairScheduler extends
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
|
Resources.subtractFrom(clusterResource, node.getTotalResource());
|
||||||
updateRootQueueMetrics();
|
updateRootQueueMetrics();
|
||||||
|
|
||||||
triggerUpdate();
|
triggerUpdate();
|
||||||
|
|
|
@ -28,6 +28,8 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -38,11 +40,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -51,14 +66,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||||
|
@ -616,4 +637,114 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||||
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
|
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
|
||||||
schedulerMaximumResourceCapability.getVirtualCores());
|
schedulerMaximumResourceCapability.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class SleepHandler implements EventHandler<SchedulerEvent> {
|
||||||
|
boolean sleepFlag = false;
|
||||||
|
int sleepTime = 20;
|
||||||
|
@Override
|
||||||
|
public void handle(SchedulerEvent event) {
|
||||||
|
try {
|
||||||
|
if (sleepFlag) {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
}
|
||||||
|
} catch(InterruptedException ie) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceTrackerService getPrivateResourceTrackerService(
|
||||||
|
Dispatcher privateDispatcher, ResourceManager rm,
|
||||||
|
SleepHandler sleepHandler) {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
|
||||||
|
RMContext privateContext =
|
||||||
|
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
|
||||||
|
null, null, null);
|
||||||
|
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
|
||||||
|
|
||||||
|
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
|
||||||
|
privateDispatcher.register(SchedulerEventType.class,
|
||||||
|
rm.getResourceScheduler());
|
||||||
|
privateDispatcher.register(RMNodeEventType.class,
|
||||||
|
new ResourceManager.NodeEventDispatcher(privateContext));
|
||||||
|
((Service) privateDispatcher).init(conf);
|
||||||
|
((Service) privateDispatcher).start();
|
||||||
|
NMLivelinessMonitor nmLivelinessMonitor =
|
||||||
|
new NMLivelinessMonitor(privateDispatcher);
|
||||||
|
nmLivelinessMonitor.init(conf);
|
||||||
|
nmLivelinessMonitor.start();
|
||||||
|
NodesListManager nodesListManager = new NodesListManager(privateContext);
|
||||||
|
nodesListManager.init(conf);
|
||||||
|
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
new RMContainerTokenSecretManager(conf);
|
||||||
|
containerTokenSecretManager.start();
|
||||||
|
NMTokenSecretManagerInRM nmTokenSecretManager =
|
||||||
|
new NMTokenSecretManagerInRM(conf);
|
||||||
|
nmTokenSecretManager.start();
|
||||||
|
ResourceTrackerService privateResourceTrackerService =
|
||||||
|
new ResourceTrackerService(privateContext, nodesListManager,
|
||||||
|
nmLivelinessMonitor, containerTokenSecretManager,
|
||||||
|
nmTokenSecretManager);
|
||||||
|
privateResourceTrackerService.init(conf);
|
||||||
|
privateResourceTrackerService.start();
|
||||||
|
rm.getResourceScheduler().setRMContext(privateContext);
|
||||||
|
return privateResourceTrackerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the behavior of the scheduler when a node reconnects
|
||||||
|
* with changed capabilities. This test is to catch any race conditions
|
||||||
|
* that might occur due to the use of the RMNode object.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNodemanagerReconnect() throws Exception {
|
||||||
|
configureScheduler();
|
||||||
|
Configuration conf = getConf();
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
try {
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
|
||||||
|
DrainDispatcher privateDispatcher = new DrainDispatcher();
|
||||||
|
SleepHandler sleepHandler = new SleepHandler();
|
||||||
|
ResourceTrackerService privateResourceTrackerService =
|
||||||
|
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
String hostname1 = "localhost1";
|
||||||
|
Resource capability = BuilderUtils.newResource(4096, 4);
|
||||||
|
RecordFactory recordFactory =
|
||||||
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
RegisterNodeManagerRequest request1 =
|
||||||
|
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
|
||||||
|
request1.setNodeId(nodeId1);
|
||||||
|
request1.setHttpPort(0);
|
||||||
|
request1.setResource(capability);
|
||||||
|
privateResourceTrackerService.registerNodeManager(request1);
|
||||||
|
privateDispatcher.await();
|
||||||
|
Resource clusterResource =
|
||||||
|
rm.getResourceScheduler().getClusterResource();
|
||||||
|
Assert.assertEquals("Initial cluster resources don't match", capability,
|
||||||
|
clusterResource);
|
||||||
|
|
||||||
|
Resource newCapability = BuilderUtils.newResource(1024, 1);
|
||||||
|
RegisterNodeManagerRequest request2 =
|
||||||
|
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||||
|
request2.setNodeId(nodeId1);
|
||||||
|
request2.setHttpPort(0);
|
||||||
|
request2.setResource(newCapability);
|
||||||
|
// hold up the disaptcher and register the same node with lower capability
|
||||||
|
sleepHandler.sleepFlag = true;
|
||||||
|
privateResourceTrackerService.registerNodeManager(request2);
|
||||||
|
privateDispatcher.await();
|
||||||
|
Assert.assertEquals("Cluster resources don't match", newCapability,
|
||||||
|
rm.getResourceScheduler().getClusterResource());
|
||||||
|
privateResourceTrackerService.stop();
|
||||||
|
} finally {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.service.Service;
|
|
||||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
@ -74,7 +73,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -82,7 +80,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
||||||
|
@ -90,13 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||||
|
@ -116,7 +110,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
@ -136,7 +129,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
@ -3285,108 +3277,6 @@ public class TestCapacityScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SleepHandler implements EventHandler<SchedulerEvent> {
|
|
||||||
boolean sleepFlag = false;
|
|
||||||
int sleepTime = 20;
|
|
||||||
@Override
|
|
||||||
public void handle(SchedulerEvent event) {
|
|
||||||
try {
|
|
||||||
if(sleepFlag) {
|
|
||||||
Thread.sleep(sleepTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch(InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ResourceTrackerService getPrivateResourceTrackerService(
|
|
||||||
Dispatcher privateDispatcher, SleepHandler sleepHandler) {
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
ResourceTrackerService privateResourceTrackerService;
|
|
||||||
|
|
||||||
RMContext privateContext =
|
|
||||||
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
|
|
||||||
null, null, null);
|
|
||||||
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
|
|
||||||
|
|
||||||
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
|
|
||||||
privateDispatcher.register(SchedulerEventType.class,
|
|
||||||
resourceManager.getResourceScheduler());
|
|
||||||
privateDispatcher.register(RMNodeEventType.class,
|
|
||||||
new ResourceManager.NodeEventDispatcher(privateContext));
|
|
||||||
((Service) privateDispatcher).init(conf);
|
|
||||||
((Service) privateDispatcher).start();
|
|
||||||
NMLivelinessMonitor nmLivelinessMonitor =
|
|
||||||
new NMLivelinessMonitor(privateDispatcher);
|
|
||||||
nmLivelinessMonitor.init(conf);
|
|
||||||
nmLivelinessMonitor.start();
|
|
||||||
NodesListManager nodesListManager = new NodesListManager(privateContext);
|
|
||||||
nodesListManager.init(conf);
|
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
|
||||||
new RMContainerTokenSecretManager(conf);
|
|
||||||
containerTokenSecretManager.start();
|
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManager =
|
|
||||||
new NMTokenSecretManagerInRM(conf);
|
|
||||||
nmTokenSecretManager.start();
|
|
||||||
privateResourceTrackerService =
|
|
||||||
new ResourceTrackerService(privateContext, nodesListManager,
|
|
||||||
nmLivelinessMonitor, containerTokenSecretManager,
|
|
||||||
nmTokenSecretManager);
|
|
||||||
privateResourceTrackerService.init(conf);
|
|
||||||
privateResourceTrackerService.start();
|
|
||||||
resourceManager.getResourceScheduler().setRMContext(privateContext);
|
|
||||||
return privateResourceTrackerService;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test the behaviour of the capacity scheduler when a node reconnects
|
|
||||||
* with changed capabilities. This test is to catch any race conditions
|
|
||||||
* that might occur due to the use of the RMNode object.
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testNodemanagerReconnect() throws Exception {
|
|
||||||
|
|
||||||
DrainDispatcher privateDispatcher = new DrainDispatcher();
|
|
||||||
SleepHandler sleepHandler = new SleepHandler();
|
|
||||||
ResourceTrackerService privateResourceTrackerService =
|
|
||||||
getPrivateResourceTrackerService(privateDispatcher,
|
|
||||||
sleepHandler);
|
|
||||||
|
|
||||||
// Register node1
|
|
||||||
String hostname1 = "localhost1";
|
|
||||||
Resource capability = BuilderUtils.newResource(4096, 4);
|
|
||||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
||||||
|
|
||||||
RegisterNodeManagerRequest request1 =
|
|
||||||
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
|
||||||
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
|
|
||||||
request1.setNodeId(nodeId1);
|
|
||||||
request1.setHttpPort(0);
|
|
||||||
request1.setResource(capability);
|
|
||||||
privateResourceTrackerService.registerNodeManager(request1);
|
|
||||||
privateDispatcher.await();
|
|
||||||
Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource();
|
|
||||||
Assert.assertEquals("Initial cluster resources don't match", capability,
|
|
||||||
clusterResource);
|
|
||||||
|
|
||||||
Resource newCapability = BuilderUtils.newResource(1024, 1);
|
|
||||||
RegisterNodeManagerRequest request2 =
|
|
||||||
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
|
||||||
request2.setNodeId(nodeId1);
|
|
||||||
request2.setHttpPort(0);
|
|
||||||
request2.setResource(newCapability);
|
|
||||||
// hold up the disaptcher and register the same node with lower capability
|
|
||||||
sleepHandler.sleepFlag = true;
|
|
||||||
privateResourceTrackerService.registerNodeManager(request2);
|
|
||||||
privateDispatcher.await();
|
|
||||||
Assert.assertEquals("Cluster resources don't match", newCapability,
|
|
||||||
resourceManager.getResourceScheduler().getClusterResource());
|
|
||||||
privateResourceTrackerService.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResourceUpdateDecommissioningNode() throws Exception {
|
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||||
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||||
|
|
Loading…
Reference in New Issue