YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu
This commit is contained in:
parent
f37bf65199
commit
6fc26ad539
|
@ -712,6 +712,14 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final float
|
public static final float
|
||||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
|
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of consecutive missed heartbeats after which node will be
|
||||||
|
* skipped from scheduling.
|
||||||
|
*/
|
||||||
|
public static final String SCHEDULER_SKIP_NODE_MULTIPLIER =
|
||||||
|
YARN_PREFIX + "scheduler.skip.node.multiplier";
|
||||||
|
public static final int DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER = 2;
|
||||||
|
|
||||||
/** Number of worker threads that write the history data. */
|
/** Number of worker threads that write the history data. */
|
||||||
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
|
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
|
||||||
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
|
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
|
||||||
|
@ -4751,6 +4759,20 @@ public class YarnConfiguration extends Configuration {
|
||||||
DEFAULT_NM_NUMA_AWARENESS_ENABLED);
|
DEFAULT_NM_NUMA_AWARENESS_ENABLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns Timeout to skip node from scheduling if not heartbeated.
|
||||||
|
* @param conf the configuration
|
||||||
|
* @return timeout in milliseconds.
|
||||||
|
*/
|
||||||
|
public static long getSkipNodeInterval(Configuration conf) {
|
||||||
|
long heartbeatIntvl = conf.getLong(
|
||||||
|
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
||||||
|
int multiplier = conf.getInt(SCHEDULER_SKIP_NODE_MULTIPLIER,
|
||||||
|
DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER);
|
||||||
|
return multiplier * heartbeatIntvl;
|
||||||
|
}
|
||||||
|
|
||||||
/* For debugging. mp configurations to system output as XML format. */
|
/* For debugging. mp configurations to system output as XML format. */
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new YarnConfiguration(new Configuration()).writeXml(System.out);
|
new YarnConfiguration(new Configuration()).writeXml(System.out);
|
||||||
|
|
|
@ -910,6 +910,13 @@
|
||||||
<value>1.0</value>
|
<value>1.0</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The Number of consecutive missed heartbeats after which node will be
|
||||||
|
skipped from scheduling</description>
|
||||||
|
<name>yarn.scheduler.skip.node.multiplier</name>
|
||||||
|
<value>2</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The minimum allowed version of a connecting nodemanager. The valid values are
|
<description>The minimum allowed version of a connecting nodemanager. The valid values are
|
||||||
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
|
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
|
||||||
|
|
|
@ -159,6 +159,7 @@ public abstract class AbstractYarnScheduler
|
||||||
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
|
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
|
||||||
protected int nmExpireInterval;
|
protected int nmExpireInterval;
|
||||||
protected long nmHeartbeatInterval;
|
protected long nmHeartbeatInterval;
|
||||||
|
private long skipNodeInterval;
|
||||||
|
|
||||||
private final static List<Container> EMPTY_CONTAINER_LIST =
|
private final static List<Container> EMPTY_CONTAINER_LIST =
|
||||||
new ArrayList<Container>();
|
new ArrayList<Container>();
|
||||||
|
@ -210,6 +211,7 @@ public abstract class AbstractYarnScheduler
|
||||||
nmHeartbeatInterval =
|
nmHeartbeatInterval =
|
||||||
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
||||||
|
skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
|
||||||
long configuredMaximumAllocationWaitTime =
|
long configuredMaximumAllocationWaitTime =
|
||||||
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
|
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
|
||||||
|
@ -368,6 +370,10 @@ public abstract class AbstractYarnScheduler
|
||||||
return lastNodeUpdateTime;
|
return lastNodeUpdateTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getSkipNodeInterval(){
|
||||||
|
return skipNodeInterval;
|
||||||
|
}
|
||||||
|
|
||||||
protected void containerLaunchedOnNode(
|
protected void containerLaunchedOnNode(
|
||||||
ContainerId containerId, SchedulerNode node) {
|
ContainerId containerId, SchedulerNode node) {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -602,4 +603,11 @@ public class SchedulerUtils {
|
||||||
node.allocateContainer(rmContainer);
|
node.allocateContainer(rmContainer);
|
||||||
return rmContainer;
|
return rmContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isNodeHeartbeated(SchedulerNode node,
|
||||||
|
long skipNodeInterval) {
|
||||||
|
long timeElapsedFromLastHeartbeat =
|
||||||
|
Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
|
||||||
|
return timeElapsedFromLastHeartbeat <= skipNodeInterval;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
||||||
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -232,7 +233,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
|
||||||
|
|
||||||
private static boolean printedVerboseLoggingForAsyncScheduling = false;
|
private boolean printedVerboseLoggingForAsyncScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EXPERT
|
* EXPERT
|
||||||
|
@ -518,22 +519,47 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private final static Random random = new Random(System.currentTimeMillis());
|
private final static Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
|
@VisibleForTesting
|
||||||
|
public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
|
||||||
CapacityScheduler cs, boolean printVerboseLog) {
|
CapacityScheduler cs, boolean printVerboseLog) {
|
||||||
// Skip node which missed 2 heartbeats since the node might be dead and
|
// Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
|
||||||
// we should not continue allocate containers on that.
|
// heartbeats since the node might be dead and we should not continue
|
||||||
long timeElapsedFromLastHeartbeat =
|
// allocate containers on that.
|
||||||
Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
|
if (!SchedulerUtils.isNodeHeartbeated(node, cs.getSkipNodeInterval())) {
|
||||||
if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
|
|
||||||
if (printVerboseLog && LOG.isDebugEnabled()) {
|
if (printVerboseLog && LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skip scheduling on node because it haven't heartbeated for "
|
long timeElapsedFromLastHeartbeat =
|
||||||
|
Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
|
||||||
|
LOG.debug("Skip scheduling on node " + node.getNodeID()
|
||||||
|
+ " because it haven't heartbeated for "
|
||||||
+ timeElapsedFromLastHeartbeat / 1000.0f + " secs");
|
+ timeElapsedFromLastHeartbeat / 1000.0f + " secs");
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (node.getRMNode().getState() != NodeState.RUNNING) {
|
||||||
|
if (printVerboseLog && LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skip scheduling on node because it is in " +
|
||||||
|
node.getRMNode().getState() + " state");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) {
|
||||||
|
// To avoid too verbose DEBUG logging, only print debug log once for
|
||||||
|
// every 10 secs.
|
||||||
|
boolean printSkipedNodeLogging = false;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
if (Time.monotonicNow() / 1000 % 10 == 0) {
|
||||||
|
printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling);
|
||||||
|
} else {
|
||||||
|
cs.printedVerboseLoggingForAsyncScheduling = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return printSkipedNodeLogging;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule on all nodes by starting at a random point.
|
* Schedule on all nodes by starting at a random point.
|
||||||
* Schedule on all partitions by starting at a random partition
|
* Schedule on all partitions by starting at a random partition
|
||||||
|
@ -555,19 +581,12 @@ public class CapacityScheduler extends
|
||||||
if (!cs.multiNodePlacementEnabled) {
|
if (!cs.multiNodePlacementEnabled) {
|
||||||
int start = random.nextInt(nodeSize);
|
int start = random.nextInt(nodeSize);
|
||||||
|
|
||||||
// To avoid too verbose DEBUG logging, only print debug log once for
|
boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs);
|
||||||
// every 10 secs.
|
|
||||||
boolean printSkipedNodeLogging = false;
|
|
||||||
if (Time.monotonicNow() / 1000 % 10 == 0) {
|
|
||||||
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
|
|
||||||
} else {
|
|
||||||
printedVerboseLoggingForAsyncScheduling = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate containers of node [start, end)
|
// Allocate containers of node [start, end)
|
||||||
for (FiCaSchedulerNode node : nodes) {
|
for (FiCaSchedulerNode node : nodes) {
|
||||||
if (current++ >= start) {
|
if (current++ >= start) {
|
||||||
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
|
if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
cs.allocateContainersToNode(node.getNodeID(), false);
|
cs.allocateContainersToNode(node.getNodeID(), false);
|
||||||
|
@ -581,14 +600,14 @@ public class CapacityScheduler extends
|
||||||
if (current++ > start) {
|
if (current++ > start) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
|
if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
cs.allocateContainersToNode(node.getNodeID(), false);
|
cs.allocateContainersToNode(node.getNodeID(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (printSkipedNodeLogging) {
|
if (printSkippedNodeLogging) {
|
||||||
printedVerboseLoggingForAsyncScheduling = true;
|
cs.printedVerboseLoggingForAsyncScheduling = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Get all partitions
|
// Get all partitions
|
||||||
|
@ -1541,20 +1560,37 @@ public class CapacityScheduler extends
|
||||||
|| assignedContainers < maxAssignPerHeartbeat);
|
|| assignedContainers < maxAssignPerHeartbeat);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
|
private Map<NodeId, FiCaSchedulerNode> getNodesHeartbeated(String partition) {
|
||||||
String partition) {
|
|
||||||
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
|
|
||||||
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
|
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
|
||||||
|
boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this);
|
||||||
List<FiCaSchedulerNode> nodes = nodeTracker
|
List<FiCaSchedulerNode> nodes = nodeTracker
|
||||||
.getNodesPerPartition(partition);
|
.getNodesPerPartition(partition);
|
||||||
|
|
||||||
if (nodes != null && !nodes.isEmpty()) {
|
if (nodes != null && !nodes.isEmpty()) {
|
||||||
//Filter for node heartbeat too long
|
//Filter for node heartbeat too long
|
||||||
nodes.stream()
|
nodes.stream()
|
||||||
.filter(node -> !shouldSkipNodeSchedule(node, this, true))
|
.filter(node ->
|
||||||
.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
|
!shouldSkipNodeSchedule(node, this, printSkippedNodeLogging))
|
||||||
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
|
.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
|
||||||
nodesByPartition, partition);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (printSkippedNodeLogging) {
|
||||||
|
printedVerboseLoggingForAsyncScheduling = true;
|
||||||
|
}
|
||||||
|
return nodesByPartition;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
|
||||||
|
String partition) {
|
||||||
|
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
|
||||||
|
Map<NodeId, FiCaSchedulerNode> nodesByPartition
|
||||||
|
= getNodesHeartbeated(partition);
|
||||||
|
|
||||||
|
if (!nodesByPartition.isEmpty()) {
|
||||||
|
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
|
||||||
|
nodesByPartition, partition);
|
||||||
|
}
|
||||||
|
|
||||||
return candidates;
|
return candidates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1563,11 +1599,9 @@ public class CapacityScheduler extends
|
||||||
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
|
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
|
||||||
candidates = new SimpleCandidateNodeSet<>(node);
|
candidates = new SimpleCandidateNodeSet<>(node);
|
||||||
if (multiNodePlacementEnabled) {
|
if (multiNodePlacementEnabled) {
|
||||||
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
|
Map<NodeId, FiCaSchedulerNode> nodesByPartition =
|
||||||
List<FiCaSchedulerNode> nodes = nodeTracker
|
getNodesHeartbeated(node.getPartition());
|
||||||
.getNodesPerPartition(node.getPartition());
|
if (!nodesByPartition.isEmpty()) {
|
||||||
if (nodes != null && !nodes.isEmpty()) {
|
|
||||||
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
|
|
||||||
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
|
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
|
||||||
nodesByPartition, node.getPartition());
|
nodesByPartition, node.getPartition());
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {
|
||||||
Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
|
Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
|
||||||
List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
|
List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
|
||||||
.getScheduler()).getNodeTracker().getNodesPerPartition(label);
|
.getScheduler()).getNodeTracker().getNodesPerPartition(label);
|
||||||
if (nodes != null && !nodes.isEmpty()) {
|
if (nodes != null) {
|
||||||
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
|
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
|
||||||
multiNodePolicy.addAndRefreshNodesSet(
|
multiNodePolicy.addAndRefreshNodesSet(
|
||||||
(Collection<N>) nodesByPartition.values(), label);
|
(Collection<N>) nodesByPartition.values(), label);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -30,8 +31,10 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Node Sorting Manager which runs all sorter threads and policies.
|
* Node Sorting Manager which runs all sorter threads and policies.
|
||||||
|
@ -48,6 +51,7 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
|
||||||
private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
|
private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private boolean multiNodePlacementEnabled;
|
private boolean multiNodePlacementEnabled;
|
||||||
|
private long skipNodeInterval;
|
||||||
|
|
||||||
public MultiNodeSortingManager() {
|
public MultiNodeSortingManager() {
|
||||||
super("MultiNodeSortingManager");
|
super("MultiNodeSortingManager");
|
||||||
|
@ -59,6 +63,7 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
|
||||||
LOG.info("Initializing NodeSortingService=" + getName());
|
LOG.info("Initializing NodeSortingService=" + getName());
|
||||||
super.serviceInit(configuration);
|
super.serviceInit(configuration);
|
||||||
this.conf = configuration;
|
this.conf = configuration;
|
||||||
|
this.skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -134,6 +139,42 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
|
||||||
policy.addAndRefreshNodesSet(nodes, partition);
|
policy.addAndRefreshNodesSet(nodes, partition);
|
||||||
}
|
}
|
||||||
|
|
||||||
return policy.getPreferredNodeIterator(nodes, partition);
|
Iterator<N> nodesIterator = policy.getPreferredNodeIterator(nodes,
|
||||||
|
partition);
|
||||||
|
|
||||||
|
// Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
|
||||||
|
// heartbeats since the node might be dead and we should not continue
|
||||||
|
// allocate containers on that.
|
||||||
|
Iterator<N> filteringIterator = new Iterator() {
|
||||||
|
private N cached;
|
||||||
|
private boolean hasCached;
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
if (hasCached) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
while (nodesIterator.hasNext()) {
|
||||||
|
cached = nodesIterator.next();
|
||||||
|
if (SchedulerUtils.isNodeHeartbeated(cached, skipNodeInterval)) {
|
||||||
|
hasCached = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public N next() {
|
||||||
|
if (hasCached) {
|
||||||
|
hasCached = false;
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
return next();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return filteringIterator;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -574,8 +574,6 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
+ ".scheduling-interval-ms", 100);
|
+ ".scheduling-interval-ms", 100);
|
||||||
// Heartbeat interval is 100 ms.
|
// Heartbeat interval is 100 ms.
|
||||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
|
||||||
|
|
||||||
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
|
||||||
mgr.init(conf);
|
mgr.init(conf);
|
||||||
|
|
||||||
// inject node label manager
|
// inject node label manager
|
||||||
|
@ -648,6 +646,112 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
rm.close();
|
rm.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure scheduler skips NMs which are not RUNNING.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAsyncSchedulerSkipNoRunningNMs() throws Exception {
|
||||||
|
int heartbeatInterval = 100;
|
||||||
|
conf.setInt(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
|
||||||
|
1);
|
||||||
|
conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
|
||||||
|
+ ".scheduling-interval-ms", 100);
|
||||||
|
// Heartbeat interval is 100 ms.
|
||||||
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
||||||
|
heartbeatInterval);
|
||||||
|
conf.setInt(YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER,
|
||||||
|
5);
|
||||||
|
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(conf);
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
rm.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
List<MockNM> nms = new ArrayList<>();
|
||||||
|
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
|
||||||
|
}
|
||||||
|
|
||||||
|
keepNMHeartbeat(nms, heartbeatInterval);
|
||||||
|
|
||||||
|
List<MockAM> ams = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
RMApp rmApp = MockRMAppSubmitter.submit(rm,
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
|
||||||
|
.withAppName("app")
|
||||||
|
.withUser("user")
|
||||||
|
.withAcls(null)
|
||||||
|
.withUnmanagedAM(false)
|
||||||
|
.withQueue(Character.toString((char) (i % 34 + 97)))
|
||||||
|
.withMaxAppAttempts(1)
|
||||||
|
.withCredentials(null)
|
||||||
|
.withAppType(null)
|
||||||
|
.withWaitForAppAcceptedState(false)
|
||||||
|
.build());
|
||||||
|
MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
|
||||||
|
am.registerAppAttempt();
|
||||||
|
ams.add(am);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test for no NodeState.RUNNING node
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
RMNode rmNode = cs.getNode(nms.get(i).getNodeId()).getRMNode();
|
||||||
|
cs.getRMContext().getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(rmNode.getNodeID(),
|
||||||
|
RMNodeEventType.GRACEFUL_DECOMMISSION));
|
||||||
|
rm.drainEvents();
|
||||||
|
Assert.assertEquals(NodeState.DECOMMISSIONING, rmNode.getState());
|
||||||
|
boolean shouldSkip =
|
||||||
|
cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()),
|
||||||
|
cs, true);
|
||||||
|
// make sure should skip
|
||||||
|
Assert.assertTrue(shouldSkip);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 5; i < 9; i++) {
|
||||||
|
boolean shouldSkip =
|
||||||
|
cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()),
|
||||||
|
cs, true);
|
||||||
|
// make sure should not skip
|
||||||
|
Assert.assertFalse(shouldSkip);
|
||||||
|
}
|
||||||
|
|
||||||
|
pauseNMHeartbeat();
|
||||||
|
|
||||||
|
//Not exceed configured 5
|
||||||
|
Thread.sleep(heartbeatInterval * 3);
|
||||||
|
|
||||||
|
// Applications request containers.
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for 2000 ms.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
//Make sure that NM 0-5 don't have non-AM containers.
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
if (i < 5) {
|
||||||
|
Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
|
||||||
|
} else {
|
||||||
|
Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
|
|
||||||
public static class NMHeartbeatThread extends Thread {
|
public static class NMHeartbeatThread extends Thread {
|
||||||
private List<MockNM> mockNMS;
|
private List<MockNM> mockNMS;
|
||||||
private int interval;
|
private int interval;
|
||||||
|
|
|
@ -19,11 +19,14 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -437,4 +440,39 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception {
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
rm.registerNode("127.0.0.1:1234", 10 * GB);
|
||||||
|
rm.registerNode("127.0.0.2:1234", 10 * GB);
|
||||||
|
rm.registerNode("127.0.0.3:1234", 10 * GB);
|
||||||
|
rm.registerNode("127.0.0.4:1234", 10 * GB);
|
||||||
|
|
||||||
|
Set<SchedulerNode> nodes = new HashSet<>();
|
||||||
|
String partition = "";
|
||||||
|
|
||||||
|
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
|
||||||
|
waitforNMRegistered(scheduler, 4, 5);
|
||||||
|
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
|
||||||
|
.getMultiNodeSortingManager();
|
||||||
|
MultiNodeSorter<SchedulerNode> sorter = mns
|
||||||
|
.getMultiNodePolicy(POLICY_CLASS_NAME);
|
||||||
|
sorter.reSortClusterNodes();
|
||||||
|
|
||||||
|
Iterator<SchedulerNode> nodeIterator = mns.getMultiNodeSortIterator(
|
||||||
|
nodes, partition, POLICY_CLASS_NAME);
|
||||||
|
Assert.assertEquals(4, Iterators.size(nodeIterator));
|
||||||
|
|
||||||
|
// Validate the count after missing 3 node heartbeats
|
||||||
|
Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3);
|
||||||
|
|
||||||
|
nodeIterator = mns.getMultiNodeSortIterator(
|
||||||
|
nodes, partition, POLICY_CLASS_NAME);
|
||||||
|
Assert.assertEquals(0, Iterators.size(nodeIterator));
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,7 @@ public class TestCapacitySchedulerMultiNodesWithPreemption
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
||||||
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
|
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
|
||||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||||
|
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 60000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
|
|
@ -127,6 +127,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
||||||
conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
|
conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
|
||||||
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||||
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||||
|
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 30000);
|
||||||
rm = new MockRM(conf);
|
rm = new MockRM(conf);
|
||||||
bind(ResourceManager.class).toInstance(rm);
|
bind(ResourceManager.class).toInstance(rm);
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
|
|
Loading…
Reference in New Issue