YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan.
(cherry picked from commit e9c72d04be
)
This commit is contained in:
parent
1d7e9b9ad8
commit
0463ede09d
|
@ -968,11 +968,11 @@ public abstract class AbstractYarnScheduler
|
||||||
/**
|
/**
|
||||||
* Get lists of new containers from NodeManager and process them.
|
* Get lists of new containers from NodeManager and process them.
|
||||||
* @param nm The RMNode corresponding to the NodeManager
|
* @param nm The RMNode corresponding to the NodeManager
|
||||||
|
* @param schedulerNode schedulerNode
|
||||||
* @return list of completed containers
|
* @return list of completed containers
|
||||||
*/
|
*/
|
||||||
protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
|
private List<ContainerStatus> updateNewContainerInfo(RMNode nm,
|
||||||
SchedulerNode node = getNode(nm.getNodeID());
|
SchedulerNode schedulerNode) {
|
||||||
|
|
||||||
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
||||||
List<ContainerStatus> newlyLaunchedContainers =
|
List<ContainerStatus> newlyLaunchedContainers =
|
||||||
new ArrayList<>();
|
new ArrayList<>();
|
||||||
|
@ -987,14 +987,15 @@ public abstract class AbstractYarnScheduler
|
||||||
|
|
||||||
// Processing the newly launched containers
|
// Processing the newly launched containers
|
||||||
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
||||||
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
|
containerLaunchedOnNode(launchedContainer.getContainerId(),
|
||||||
|
schedulerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Processing the newly increased containers
|
// Processing the newly increased containers
|
||||||
List<Container> newlyIncreasedContainers =
|
List<Container> newlyIncreasedContainers =
|
||||||
nm.pullNewlyIncreasedContainers();
|
nm.pullNewlyIncreasedContainers();
|
||||||
for (Container container : newlyIncreasedContainers) {
|
for (Container container : newlyIncreasedContainers) {
|
||||||
containerIncreasedOnNode(container.getId(), node, container);
|
containerIncreasedOnNode(container.getId(), schedulerNode, container);
|
||||||
}
|
}
|
||||||
|
|
||||||
return completedContainers;
|
return completedContainers;
|
||||||
|
@ -1005,12 +1006,12 @@ public abstract class AbstractYarnScheduler
|
||||||
* @param completedContainers Extracted list of completed containers
|
* @param completedContainers Extracted list of completed containers
|
||||||
* @param releasedResources Reference resource object for completed containers
|
* @param releasedResources Reference resource object for completed containers
|
||||||
* @param nodeId NodeId corresponding to the NodeManager
|
* @param nodeId NodeId corresponding to the NodeManager
|
||||||
|
* @param schedulerNode schedulerNode
|
||||||
* @return The total number of released containers
|
* @return The total number of released containers
|
||||||
*/
|
*/
|
||||||
protected int updateCompletedContainers(List<ContainerStatus>
|
private int updateCompletedContainers(List<ContainerStatus> completedContainers,
|
||||||
completedContainers, Resource releasedResources, NodeId nodeId) {
|
Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) {
|
||||||
int releasedContainers = 0;
|
int releasedContainers = 0;
|
||||||
SchedulerNode node = getNode(nodeId);
|
|
||||||
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
|
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
|
||||||
for (ContainerStatus completedContainer : completedContainers) {
|
for (ContainerStatus completedContainer : completedContainers) {
|
||||||
ContainerId containerId = completedContainer.getContainerId();
|
ContainerId containerId = completedContainer.getContainerId();
|
||||||
|
@ -1018,8 +1019,8 @@ public abstract class AbstractYarnScheduler
|
||||||
RMContainer container = getRMContainer(containerId);
|
RMContainer container = getRMContainer(containerId);
|
||||||
completedContainer(container,
|
completedContainer(container,
|
||||||
completedContainer, RMContainerEventType.FINISHED);
|
completedContainer, RMContainerEventType.FINISHED);
|
||||||
if (node != null) {
|
if (schedulerNode != null) {
|
||||||
node.releaseContainer(containerId, true);
|
schedulerNode.releaseContainer(containerId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (container != null) {
|
if (container != null) {
|
||||||
|
@ -1064,14 +1065,14 @@ public abstract class AbstractYarnScheduler
|
||||||
/**
|
/**
|
||||||
* Update container and utilization information on the NodeManager.
|
* Update container and utilization information on the NodeManager.
|
||||||
* @param nm The NodeManager to update
|
* @param nm The NodeManager to update
|
||||||
|
* @param schedulerNode schedulerNode
|
||||||
*/
|
*/
|
||||||
protected void updateNodeResourceUtilization(RMNode nm) {
|
protected void updateNodeResourceUtilization(RMNode nm,
|
||||||
SchedulerNode node = getNode(nm.getNodeID());
|
SchedulerNode schedulerNode) {
|
||||||
// Updating node resource utilization
|
// Updating node resource utilization
|
||||||
node.setAggregatedContainersUtilization(
|
schedulerNode.setAggregatedContainersUtilization(
|
||||||
nm.getAggregatedContainersUtilization());
|
nm.getAggregatedContainersUtilization());
|
||||||
node.setNodeUtilization(nm.getNodeUtilization());
|
schedulerNode.setNodeUtilization(nm.getNodeUtilization());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1085,12 +1086,17 @@ public abstract class AbstractYarnScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process new container information
|
// Process new container information
|
||||||
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
|
SchedulerNode schedulerNode = getNode(nm.getNodeID());
|
||||||
|
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
|
||||||
|
schedulerNode);
|
||||||
|
|
||||||
|
// Notify Scheduler Node updated.
|
||||||
|
schedulerNode.notifyNodeUpdate();
|
||||||
|
|
||||||
// Process completed containers
|
// Process completed containers
|
||||||
Resource releasedResources = Resource.newInstance(0, 0);
|
Resource releasedResources = Resource.newInstance(0, 0);
|
||||||
int releasedContainers = updateCompletedContainers(completedContainers,
|
int releasedContainers = updateCompletedContainers(completedContainers,
|
||||||
releasedResources, nm.getNodeID());
|
releasedResources, nm.getNodeID(), schedulerNode);
|
||||||
|
|
||||||
// If the node is decommissioning, send an update to have the total
|
// If the node is decommissioning, send an update to have the total
|
||||||
// resource equal to the used resource, so no available resource to
|
// resource equal to the used resource, so no available resource to
|
||||||
|
@ -1103,18 +1109,17 @@ public abstract class AbstractYarnScheduler
|
||||||
.getEventHandler()
|
.getEventHandler()
|
||||||
.handle(
|
.handle(
|
||||||
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
|
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
|
||||||
.newInstance(getSchedulerNode(nm.getNodeID())
|
.newInstance(schedulerNode.getAllocatedResource(), 0)));
|
||||||
.getAllocatedResource(), 0)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updateSchedulerHealthInformation(releasedResources, releasedContainers);
|
updateSchedulerHealthInformation(releasedResources, releasedContainers);
|
||||||
updateNodeResourceUtilization(nm);
|
updateNodeResourceUtilization(nm, schedulerNode);
|
||||||
|
|
||||||
// Now node data structures are up-to-date and ready for scheduling.
|
// Now node data structures are up-to-date and ready for scheduling.
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
SchedulerNode node = getNode(nm.getNodeID());
|
LOG.debug(
|
||||||
LOG.debug("Node being looked for scheduling " + nm +
|
"Node being looked for scheduling " + nm + " availableResource: "
|
||||||
" availableResource: " + node.getUnallocatedResource());
|
+ schedulerNode.getUnallocatedResource());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
@ -76,6 +77,9 @@ public abstract class SchedulerNode {
|
||||||
|
|
||||||
private volatile Set<String> labels = null;
|
private volatile Set<String> labels = null;
|
||||||
|
|
||||||
|
// Last updated time
|
||||||
|
private volatile long lastHeartbeatMonotonicTime;
|
||||||
|
|
||||||
public SchedulerNode(RMNode node, boolean usePortForNodeName,
|
public SchedulerNode(RMNode node, boolean usePortForNodeName,
|
||||||
Set<String> labels) {
|
Set<String> labels) {
|
||||||
this.rmNode = node;
|
this.rmNode = node;
|
||||||
|
@ -87,6 +91,7 @@ public abstract class SchedulerNode {
|
||||||
nodeName = rmNode.getHostName();
|
nodeName = rmNode.getHostName();
|
||||||
}
|
}
|
||||||
this.labels = ImmutableSet.copyOf(labels);
|
this.labels = ImmutableSet.copyOf(labels);
|
||||||
|
this.lastHeartbeatMonotonicTime = Time.monotonicNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
|
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
|
||||||
|
@ -453,6 +458,17 @@ public abstract class SchedulerNode {
|
||||||
return this.nodeUtilization;
|
return this.nodeUtilization;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getLastHeartbeatMonotonicTime() {
|
||||||
|
return lastHeartbeatMonotonicTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will be called for each node heartbeat.
|
||||||
|
*/
|
||||||
|
public void notifyNodeUpdate() {
|
||||||
|
this.lastHeartbeatMonotonicTime = Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class ContainerInfo {
|
private static class ContainerInfo {
|
||||||
private final RMContainer container;
|
private final RMContainer container;
|
||||||
|
|
|
@ -235,6 +235,8 @@ public class CapacityScheduler extends
|
||||||
private RMNodeLabelsManager labelManager;
|
private RMNodeLabelsManager labelManager;
|
||||||
private AppPriorityACLsManager appPriorityACLManager;
|
private AppPriorityACLsManager appPriorityACLManager;
|
||||||
|
|
||||||
|
private static boolean printedVerboseLoggingForAsyncScheduling = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EXPERT
|
* EXPERT
|
||||||
*/
|
*/
|
||||||
|
@ -456,6 +458,22 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private final static Random random = new Random(System.currentTimeMillis());
|
private final static Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
|
private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
|
||||||
|
CapacityScheduler cs, boolean printVerboseLog) {
|
||||||
|
// Skip node which missed 2 heartbeats since the node might be dead and
|
||||||
|
// we should not continue allocate containers on that.
|
||||||
|
long timeElapsedFromLastHeartbeat =
|
||||||
|
Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
|
||||||
|
if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
|
||||||
|
if (printVerboseLog && LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skip scheduling on node because it haven't heartbeated for "
|
||||||
|
+ timeElapsedFromLastHeartbeat / 1000.0f + " secs");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule on all nodes by starting at a random point.
|
* Schedule on all nodes by starting at a random point.
|
||||||
* @param cs
|
* @param cs
|
||||||
|
@ -466,16 +484,42 @@ public class CapacityScheduler extends
|
||||||
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
|
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
|
||||||
int start = random.nextInt(nodes.size());
|
int start = random.nextInt(nodes.size());
|
||||||
|
|
||||||
|
// To avoid too verbose DEBUG logging, only print debug log once for
|
||||||
|
// every 10 secs.
|
||||||
|
boolean printSkipedNodeLogging = false;
|
||||||
|
if (Time.monotonicNow() / 1000 % 10 == 0) {
|
||||||
|
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
|
||||||
|
} else {
|
||||||
|
printedVerboseLoggingForAsyncScheduling = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate containers of node [start, end)
|
||||||
for (FiCaSchedulerNode node : nodes) {
|
for (FiCaSchedulerNode node : nodes) {
|
||||||
if (current++ >= start) {
|
if (current++ >= start) {
|
||||||
|
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
cs.allocateContainersToNode(node.getNodeID(), false);
|
cs.allocateContainersToNode(node.getNodeID(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Now, just get everyone to be safe
|
|
||||||
|
current = 0;
|
||||||
|
|
||||||
|
// Allocate containers of node [0, start)
|
||||||
for (FiCaSchedulerNode node : nodes) {
|
for (FiCaSchedulerNode node : nodes) {
|
||||||
|
if (current++ > start) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
cs.allocateContainersToNode(node.getNodeID(), false);
|
cs.allocateContainersToNode(node.getNodeID(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (printSkipedNodeLogging) {
|
||||||
|
printedVerboseLoggingForAsyncScheduling = true;
|
||||||
|
}
|
||||||
|
|
||||||
Thread.sleep(cs.getAsyncScheduleInterval());
|
Thread.sleep(cs.getAsyncScheduleInterval());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class TestRMHAForAsyncScheduler extends RMHATestBase {
|
public class TestRMHAForAsyncScheduler extends RMHATestBase {
|
||||||
|
private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread
|
||||||
|
nmHeartbeatThread = null;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@Override
|
@Override
|
||||||
|
@ -57,26 +63,49 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
|
||||||
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
|
||||||
|
if (nmHeartbeatThread != null) {
|
||||||
|
nmHeartbeatThread.setShouldStop();
|
||||||
|
nmHeartbeatThread = null;
|
||||||
|
}
|
||||||
|
nmHeartbeatThread =
|
||||||
|
new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs,
|
||||||
|
interval);
|
||||||
|
nmHeartbeatThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pauseNMHeartbeat() {
|
||||||
|
if (nmHeartbeatThread != null) {
|
||||||
|
nmHeartbeatThread.setShouldStop();
|
||||||
|
nmHeartbeatThread = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
|
public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
|
||||||
// start two RMs, and transit rm1 to active, rm2 to standby
|
// start two RMs, and transit rm1 to active, rm2 to standby
|
||||||
startRMs();
|
startRMs();
|
||||||
// register NM
|
// register NM
|
||||||
rm1.registerNode("h1:1234", 8192, 8);
|
MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
|
||||||
// submit app1 and check
|
// submit app1 and check
|
||||||
RMApp app1 = submitAppAndCheckLaunched(rm1);
|
RMApp app1 = submitAppAndCheckLaunched(rm1);
|
||||||
|
keepNMHeartbeat(Arrays.asList(nm), 1000);
|
||||||
|
|
||||||
// failover RM1 to RM2
|
// failover RM1 to RM2
|
||||||
explicitFailover();
|
explicitFailover();
|
||||||
checkAsyncSchedulerThreads(Thread.currentThread());
|
checkAsyncSchedulerThreads(Thread.currentThread());
|
||||||
|
pauseNMHeartbeat();
|
||||||
|
|
||||||
// register NM, kill app1
|
// register NM, kill app1
|
||||||
rm2.registerNode("h1:1234", 8192, 8);
|
nm = rm2.registerNode("192.1.1.1:1234", 8192, 8);
|
||||||
|
keepNMHeartbeat(Arrays.asList(nm), 1000);
|
||||||
|
|
||||||
rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
|
rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
RMAppAttemptState.LAUNCHED);
|
RMAppAttemptState.LAUNCHED);
|
||||||
rm2.killApp(app1.getApplicationId());
|
rm2.killApp(app1.getApplicationId());
|
||||||
// submit app3 and check
|
// submit app3 and check
|
||||||
RMApp app2 = submitAppAndCheckLaunched(rm2);
|
RMApp app2 = submitAppAndCheckLaunched(rm2);
|
||||||
|
pauseNMHeartbeat();
|
||||||
|
|
||||||
// failover RM2 to RM1
|
// failover RM2 to RM1
|
||||||
HAServiceProtocol.StateChangeRequestInfo requestInfo =
|
HAServiceProtocol.StateChangeRequestInfo requestInfo =
|
||||||
|
@ -92,12 +121,15 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
|
||||||
checkAsyncSchedulerThreads(Thread.currentThread());
|
checkAsyncSchedulerThreads(Thread.currentThread());
|
||||||
|
|
||||||
// register NM, kill app2
|
// register NM, kill app2
|
||||||
rm1.registerNode("h1:1234", 8192, 8);
|
nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
|
||||||
|
keepNMHeartbeat(Arrays.asList(nm), 1000);
|
||||||
|
|
||||||
rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
|
rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
RMAppAttemptState.LAUNCHED);
|
RMAppAttemptState.LAUNCHED);
|
||||||
rm1.killApp(app2.getApplicationId());
|
rm1.killApp(app2.getApplicationId());
|
||||||
// submit app3 and check
|
// submit app3 and check
|
||||||
submitAppAndCheckLaunched(rm1);
|
submitAppAndCheckLaunched(rm1);
|
||||||
|
pauseNMHeartbeat();
|
||||||
|
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||||
|
@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
|
|
||||||
RMNodeLabelsManager mgr;
|
RMNodeLabelsManager mgr;
|
||||||
|
|
||||||
|
private NMHeartbeatThread nmHeartbeatThread = null;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
|
@ -122,9 +125,11 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
List<MockNM> nms = new ArrayList<>();
|
List<MockNM> nms = new ArrayList<>();
|
||||||
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
|
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
|
nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
keepNMHeartbeat(nms, 1000);
|
||||||
|
|
||||||
List<MockAM> ams = new ArrayList<>();
|
List<MockAM> ams = new ArrayList<>();
|
||||||
// Add 3 applications to the cluster, one app in one queue
|
// Add 3 applications to the cluster, one app in one queue
|
||||||
// the i-th app ask (20 * i) containers. So in total we will have
|
// the i-th app ask (20 * i) containers. So in total we will have
|
||||||
|
@ -185,8 +190,8 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
// init RM & NMs & Nodes
|
// init RM & NMs & Nodes
|
||||||
final MockRM rm = new MockRM(disableAsyncConf);
|
final MockRM rm = new MockRM(disableAsyncConf);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
|
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB);
|
||||||
final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
|
final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB);
|
||||||
List<MockNM> nmLst = new ArrayList<>();
|
List<MockNM> nmLst = new ArrayList<>();
|
||||||
nmLst.add(nm1);
|
nmLst.add(nm1);
|
||||||
nmLst.add(nm2);
|
nmLst.add(nm2);
|
||||||
|
@ -277,8 +282,8 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
// init RM & NMs & Nodes
|
// init RM & NMs & Nodes
|
||||||
final MockRM rm = new MockRM(disableAsyncConf);
|
final MockRM rm = new MockRM(disableAsyncConf);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
|
final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
|
||||||
final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
|
final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB);
|
||||||
|
|
||||||
// init scheduler nodes
|
// init scheduler nodes
|
||||||
int waitTime = 1000;
|
int waitTime = 1000;
|
||||||
|
@ -416,8 +421,8 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
// init RM & NMs & Nodes
|
// init RM & NMs & Nodes
|
||||||
final MockRM rm = new MockRM(disableAsyncConf);
|
final MockRM rm = new MockRM(disableAsyncConf);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
|
final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
|
||||||
final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
|
final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB);
|
||||||
List<MockNM> nmLst = new ArrayList<>();
|
List<MockNM> nmLst = new ArrayList<>();
|
||||||
nmLst.add(nm1);
|
nmLst.add(nm1);
|
||||||
nmLst.add(nm2);
|
nmLst.add(nm2);
|
||||||
|
@ -476,6 +481,146 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure scheduler skips NMs which haven't heartbeat for a while.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception {
|
||||||
|
int heartbeatInterval = 100;
|
||||||
|
conf.setInt(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
|
||||||
|
1);
|
||||||
|
conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
|
||||||
|
+ ".scheduling-interval-ms", 100);
|
||||||
|
// Heartbeat interval is 100 ms.
|
||||||
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(conf);
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
rm.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
List<MockNM> nms = new ArrayList<>();
|
||||||
|
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<MockAM> ams = new ArrayList<>();
|
||||||
|
|
||||||
|
keepNMHeartbeat(nms, heartbeatInterval);
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
|
||||||
|
Character.toString((char) (i % 34 + 97)), 1, null, null, false);
|
||||||
|
MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
|
||||||
|
am.registerAppAttempt();
|
||||||
|
ams.add(am);
|
||||||
|
}
|
||||||
|
|
||||||
|
pauseNMHeartbeat();
|
||||||
|
|
||||||
|
Thread.sleep(heartbeatInterval * 3);
|
||||||
|
|
||||||
|
// Applications request containers.
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
// Do heartbeat for NM 0-4
|
||||||
|
nms.get(i).nodeHeartbeat(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for 2000 ms.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// Make sure that NM5-9 don't have non-AM containers.
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
if (i < 5) {
|
||||||
|
Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
|
||||||
|
} else {
|
||||||
|
Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class NMHeartbeatThread extends Thread {
|
||||||
|
private List<MockNM> mockNMS;
|
||||||
|
private int interval;
|
||||||
|
private volatile boolean shouldStop = false;
|
||||||
|
|
||||||
|
public NMHeartbeatThread(List<MockNM> mockNMs, int interval) {
|
||||||
|
this.mockNMS = mockNMs;
|
||||||
|
this.interval = interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
if (shouldStop) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
for (MockNM nm : mockNMS) {
|
||||||
|
try {
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(interval);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShouldStop() {
|
||||||
|
shouldStop = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
|
||||||
|
if (nmHeartbeatThread != null) {
|
||||||
|
nmHeartbeatThread.setShouldStop();
|
||||||
|
nmHeartbeatThread = null;
|
||||||
|
}
|
||||||
|
nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval);
|
||||||
|
nmHeartbeatThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pauseNMHeartbeat() {
|
||||||
|
if (nmHeartbeatThread != null) {
|
||||||
|
nmHeartbeatThread.setShouldStop();
|
||||||
|
nmHeartbeatThread = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) {
|
||||||
|
SchedulerNode node = cs.getNode(nm.getNodeId());
|
||||||
|
int nonAMContainer = 0;
|
||||||
|
for (RMContainer c : node.getCopiedListOfRunningContainers()) {
|
||||||
|
if (!c.isAMContainer()) {
|
||||||
|
nonAMContainer++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nonAMContainer;
|
||||||
|
}
|
||||||
|
|
||||||
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
|
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
|
||||||
int nContainer, Resource resource, int priority, int startContainerId)
|
int nContainer, Resource resource, int priority, int startContainerId)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue