YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).
This commit is contained in:
parent
cdaef111d5
commit
2473e8b711
|
@ -246,6 +246,13 @@ public class NodeInfo {
|
|||
@Override
|
||||
public void resetUpdatedCapability() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long calculateHeartBeatInterval(
|
||||
long defaultInterval, long minInterval, long maxInterval,
|
||||
float speedupFactor, float slowdownFactor) {
|
||||
return defaultInterval;
|
||||
}
|
||||
}
|
||||
|
||||
public static RMNode newNodeInfo(String rackName, String hostName,
|
||||
|
|
|
@ -231,4 +231,11 @@ public class RMNodeWrapper implements RMNode {
|
|||
@Override
|
||||
public void resetUpdatedCapability() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long calculateHeartBeatInterval(
|
||||
long defaultInterval, long minInterval, long maxInterval,
|
||||
float speedupFactor, float slowdownFactor) {
|
||||
return defaultInterval;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -690,6 +690,30 @@ public class YarnConfiguration extends Configuration {
|
|||
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
|
||||
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
|
||||
|
||||
/** Enable Heartbeat Interval Scaling based on cpu utilization. */
|
||||
public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
|
||||
RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
|
||||
public static final boolean
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
|
||||
|
||||
public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
|
||||
RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
|
||||
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
|
||||
|
||||
public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
|
||||
RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
|
||||
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
|
||||
|
||||
public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
|
||||
RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
|
||||
public static final float
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
|
||||
|
||||
public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
|
||||
RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
|
||||
public static final float
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
|
||||
|
||||
/** Number of worker threads that write the history data. */
|
||||
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
|
||||
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
|
||||
|
|
|
@ -854,6 +854,56 @@
|
|||
<value>1000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Enables heart-beat interval scaling. The NodeManager
|
||||
heart-beat interval will scale based on the difference between the CPU
|
||||
utilization on the node and the cluster-wide average CPU utilization.
|
||||
</description>
|
||||
<name>
|
||||
yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
|
||||
</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>If heart-beat interval scaling is enabled, this is the
|
||||
minimum heart-beat interval in milliseconds
|
||||
</description>
|
||||
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms</name>
|
||||
<value>1000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>If heart-beat interval scaling is enabled, this is the
|
||||
maximum heart-beat interval in milliseconds</description>
|
||||
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms</name>
|
||||
<value>1000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>If heart-beat interval scaling is enabled, this controls
|
||||
the degree of adjustment when speeding up heartbeat intervals.
|
||||
At 1.0, 20% less than average CPU utilization will result in a 20%
|
||||
decrease in heartbeat interval.
|
||||
</description>
|
||||
<name>
|
||||
yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
|
||||
</name>
|
||||
<value>1.0</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>If heart-beat interval scaling is enabled, this controls
|
||||
the degree of adjustment when slowing down heartbeat intervals.
|
||||
At 1.0, 20% greater than average CPU utilization will result in a 20%
|
||||
increase in heartbeat interval.
|
||||
</description>
|
||||
<name>
|
||||
yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
|
||||
</name>
|
||||
<value>1.0</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The minimum allowed version of a connecting nodemanager. The valid values are
|
||||
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
|
||||
|
|
|
@ -730,6 +730,14 @@ public class AdminService extends CompositeService implements
|
|||
// refresh dynamic resource in ResourceTrackerService
|
||||
this.rm.getRMContext().getResourceTrackerService().
|
||||
updateDynamicResourceConfiguration(newConf);
|
||||
|
||||
// Update our heartbeat configuration as well
|
||||
Configuration ysconf =
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
|
||||
this.rm.getRMContext().getResourceTrackerService()
|
||||
.updateHeartBeatConfiguration(ysconf);
|
||||
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
|
||||
"AdminService");
|
||||
return response;
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -53,6 +54,8 @@ public class ClusterMetrics {
|
|||
private MutableRate aMContainerAllocationDelay;
|
||||
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
|
||||
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
|
||||
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
|
||||
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
|
||||
|
||||
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
|
||||
"Metrics for the Yarn Cluster");
|
||||
|
@ -83,7 +86,7 @@ public class ClusterMetrics {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized static void destroy() {
|
||||
public synchronized static void destroy() {
|
||||
isInitialized.set(false);
|
||||
INSTANCE = null;
|
||||
}
|
||||
|
@ -195,6 +198,28 @@ public class ClusterMetrics {
|
|||
aMRegisterDelay.add(delay);
|
||||
}
|
||||
|
||||
public long getCapabilityMB() {
|
||||
return capabilityMB.value();
|
||||
}
|
||||
|
||||
public long getCapabilityVirtualCores() {
|
||||
return capabilityVirtualCores.value();
|
||||
}
|
||||
|
||||
public void incrCapability(Resource res) {
|
||||
if (res != null) {
|
||||
capabilityMB.incr(res.getMemorySize());
|
||||
capabilityVirtualCores.incr(res.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
public void decrCapability(Resource res) {
|
||||
if (res != null) {
|
||||
capabilityMB.decr(res.getMemorySize());
|
||||
capabilityVirtualCores.decr(res.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
public void addAMContainerAllocationDelay(long delay) {
|
||||
aMContainerAllocationDelay.add(delay);
|
||||
}
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
@ -114,6 +113,13 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
private final WriteLock writeLock;
|
||||
|
||||
private long nextHeartBeatInterval;
|
||||
private boolean heartBeatIntervalScalingEnable;
|
||||
private long heartBeatIntervalMin;
|
||||
private long heartBeatIntervalMax;
|
||||
private float heartBeatIntervalSpeedupFactor;
|
||||
private float heartBeatIntervalSlowdownFactor;
|
||||
|
||||
|
||||
private Server server;
|
||||
private InetSocketAddress resourceTrackerAddress;
|
||||
private String minimumNodeManagerVersion;
|
||||
|
@ -157,14 +163,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
||||
|
||||
RackResolver.init(conf);
|
||||
nextHeartBeatInterval =
|
||||
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
||||
if (nextHeartBeatInterval <= 0) {
|
||||
throw new YarnRuntimeException("Invalid Configuration. "
|
||||
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
|
||||
+ " should be larger than 0.");
|
||||
}
|
||||
|
||||
checkIpHostnameInRegistration = conf.getBoolean(
|
||||
YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
|
||||
|
@ -188,7 +186,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
isDelegatedCentralizedNodeLabelsConf =
|
||||
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
|
||||
}
|
||||
|
||||
updateHeartBeatConfiguration(conf);
|
||||
loadDynamicResourceConfiguration(conf);
|
||||
decommissioningWatcher.init(conf);
|
||||
super.serviceInit(conf);
|
||||
|
@ -233,6 +231,84 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update HearBeatConfiguration with new configuration.
|
||||
* @param conf Yarn Configuration
|
||||
*/
|
||||
public void updateHeartBeatConfiguration(Configuration conf) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
nextHeartBeatInterval =
|
||||
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
||||
heartBeatIntervalScalingEnable =
|
||||
conf.getBoolean(
|
||||
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
|
||||
YarnConfiguration.
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
|
||||
heartBeatIntervalMin =
|
||||
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
|
||||
heartBeatIntervalMax =
|
||||
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
|
||||
heartBeatIntervalSpeedupFactor =
|
||||
conf.getFloat(
|
||||
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
|
||||
YarnConfiguration.
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
|
||||
heartBeatIntervalSlowdownFactor =
|
||||
conf.getFloat(
|
||||
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
|
||||
YarnConfiguration.
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
|
||||
|
||||
if (nextHeartBeatInterval <= 0) {
|
||||
LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
|
||||
+ " must be greater than 0, using default.");
|
||||
nextHeartBeatInterval =
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
|
||||
}
|
||||
|
||||
if (heartBeatIntervalScalingEnable) {
|
||||
if (heartBeatIntervalMin <= 0
|
||||
|| heartBeatIntervalMin > heartBeatIntervalMax
|
||||
|| nextHeartBeatInterval < heartBeatIntervalMin
|
||||
|| nextHeartBeatInterval > heartBeatIntervalMax) {
|
||||
LOG.warn("Invalid NM Heartbeat Configuration. "
|
||||
+ "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
|
||||
+ heartBeatIntervalMin + " <= "
|
||||
+ nextHeartBeatInterval + " <= "
|
||||
+ heartBeatIntervalMax
|
||||
+ " Setting min and max to configured interval.");
|
||||
heartBeatIntervalMin = nextHeartBeatInterval;
|
||||
heartBeatIntervalMax = nextHeartBeatInterval;
|
||||
}
|
||||
if (heartBeatIntervalSpeedupFactor < 0
|
||||
|| heartBeatIntervalSlowdownFactor < 0) {
|
||||
LOG.warn(
|
||||
"Heartbeat scaling factors must be >= 0 "
|
||||
+ " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
|
||||
+ " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
|
||||
+ ". Using Defaults");
|
||||
heartBeatIntervalSlowdownFactor =
|
||||
YarnConfiguration.
|
||||
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
|
||||
heartBeatIntervalSpeedupFactor =
|
||||
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
|
||||
}
|
||||
LOG.info("Heartbeat Scaling Configuration: "
|
||||
+ " defaultInterval:" + nextHeartBeatInterval
|
||||
+ " minimumInterval:" + heartBeatIntervalMin
|
||||
+ " maximumInterval:" + heartBeatIntervalMax
|
||||
+ " speedupFactor:" + heartBeatIntervalSpeedupFactor
|
||||
+ " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
|
@ -629,10 +705,17 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
|
||||
// Heartbeat response
|
||||
long newInterval = nextHeartBeatInterval;
|
||||
if (heartBeatIntervalScalingEnable) {
|
||||
newInterval = rmNode.calculateHeartBeatInterval(
|
||||
nextHeartBeatInterval, heartBeatIntervalMin,
|
||||
heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
|
||||
heartBeatIntervalSlowdownFactor);
|
||||
}
|
||||
NodeHeartbeatResponse nodeHeartBeatResponse =
|
||||
YarnServerBuilderUtils.newNodeHeartbeatResponse(
|
||||
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
|
||||
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
|
||||
NodeAction.NORMAL, null, null, null, null, newInterval);
|
||||
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
|
||||
|
||||
populateKeys(request, nodeHeartBeatResponse);
|
||||
|
|
|
@ -212,4 +212,8 @@ public interface RMNode {
|
|||
* @return all node attributes as a Set.
|
||||
*/
|
||||
Set<NodeAttribute> getAllNodeAttributes();
|
||||
|
||||
long calculateHeartBeatInterval(long defaultInterval,
|
||||
long minInterval, long maxInterval, float speedupFactor,
|
||||
float slowdownFactor);
|
||||
}
|
||||
|
|
|
@ -716,6 +716,48 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
|
||||
long maxInterval, float speedupFactor, float slowdownFactor) {
|
||||
|
||||
long newInterval = defaultInterval;
|
||||
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
float clusterUtil = metrics.getUtilizedVirtualCores()
|
||||
/ Math.max(1.0f, metrics.getCapabilityVirtualCores());
|
||||
|
||||
if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
|
||||
// getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
|
||||
// a physicalResource returns number of physical cores. So,
|
||||
// nodeUtil will be CPU utilization of entire node.
|
||||
float nodeUtil = this.nodeUtilization.getCPU()
|
||||
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
|
||||
|
||||
// sanitize
|
||||
nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
|
||||
clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
|
||||
|
||||
if (nodeUtil > clusterUtil) {
|
||||
// Slow down - 20% more CPU utilization means slow down by 20% * factor
|
||||
newInterval = (long) (defaultInterval
|
||||
* (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
|
||||
} else {
|
||||
// Speed up - 20% less CPU utilization means speed up by 20% * factor
|
||||
newInterval = (long) (defaultInterval
|
||||
* (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
|
||||
}
|
||||
newInterval =
|
||||
Math.min(maxInterval, Math.max(minInterval, newInterval));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting heartbeatinterval to: " + newInterval
|
||||
+ " node:" + this.nodeId + " nodeUtil: " + nodeUtil
|
||||
+ " clusterUtil: " + clusterUtil);
|
||||
}
|
||||
}
|
||||
return newInterval;
|
||||
}
|
||||
|
||||
public void handle(RMNodeEvent event) {
|
||||
LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType());
|
||||
writeLock.lock();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -106,6 +107,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|||
// Update cluster capacity
|
||||
Resources.addTo(clusterCapacity, node.getTotalResource());
|
||||
staleClusterCapacity = Resources.clone(clusterCapacity);
|
||||
ClusterMetrics.getMetrics().incrCapability(node.getTotalResource());
|
||||
|
||||
// Update maximumAllocation
|
||||
updateMaxResources(node, true);
|
||||
|
@ -201,6 +203,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|||
// Update cluster capacity
|
||||
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
|
||||
staleClusterCapacity = Resources.clone(clusterCapacity);
|
||||
ClusterMetrics.getMetrics().decrCapability(node.getTotalResource());
|
||||
|
||||
// Update maximumAllocation
|
||||
updateMaxResources(node, false);
|
||||
|
@ -493,4 +496,4 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|||
}
|
||||
return nodesPerPartition;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -335,6 +335,13 @@ public class MockNodes {
|
|||
public Resource getPhysicalResource() {
|
||||
return this.physicalResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long calculateHeartBeatInterval(
|
||||
long defaultInterval, long minInterval, long maxInterval,
|
||||
float speedupFactor, float slowdownFactor) {
|
||||
return defaultInterval;
|
||||
}
|
||||
};
|
||||
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode,
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
|
@ -1167,4 +1168,112 @@ public class TestRMNodeTransitions {
|
|||
Assert.assertEquals(1, rmNode.getContainersToBeRemovedFromNM().size());
|
||||
|
||||
}
|
||||
|
||||
private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil,
|
||||
long hbDefault, long hbMin, long hbMax, float speedup, float slowdown,
|
||||
float cpuUtil, long expectedHb) {
|
||||
nodeUtil.setCPU(cpuUtil);
|
||||
rmNode.setNodeUtilization(nodeUtil);
|
||||
long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown);
|
||||
assertEquals("heartbeat interval incorrect", expectedHb, hbInterval);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateHeartBeatInterval() {
|
||||
RMNodeImpl rmNode = getRunningNode();
|
||||
Resource nodeCapability = rmNode.getTotalCapability();
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
// Set cluster capability to 10 * nodeCapability
|
||||
int vcoreUnit = nodeCapability.getVirtualCores();
|
||||
rmNode.setPhysicalResource(nodeCapability);
|
||||
int clusterVcores = vcoreUnit * 10;
|
||||
metrics.incrCapability(
|
||||
Resource.newInstance(10 * nodeCapability.getMemorySize(),
|
||||
clusterVcores));
|
||||
|
||||
long hbDefault = 2000;
|
||||
long hbMin = 1500;
|
||||
long hbMax = 2500;
|
||||
float speedup = 1.0F;
|
||||
float slowdown = 1.0F;
|
||||
metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util
|
||||
ResourceUtilization nodeUtil = ResourceUtilization.newInstance(
|
||||
1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
|
||||
|
||||
// Try with 50% speedup/slowdown factors
|
||||
speedup = 0.5F;
|
||||
slowdown = 0.5F;
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90%
|
||||
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
|
||||
|
||||
// With Physical Resource null, it should always return default
|
||||
rmNode.setPhysicalResource(null);
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10%
|
||||
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
|
||||
speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100%
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -40,12 +42,19 @@ import static org.junit.Assert.assertEquals;
|
|||
*/
|
||||
public class TestClusterNodeTracker {
|
||||
private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
|
||||
private ClusterMetrics metrics;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
metrics = ClusterMetrics.getMetrics();
|
||||
nodeTracker = new ClusterNodeTracker<>();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
ClusterMetrics.destroy();
|
||||
}
|
||||
|
||||
private void addEight4x4Nodes() {
|
||||
MockNodes.resetHostIds();
|
||||
List<RMNode> rmNodes =
|
||||
|
@ -65,6 +74,15 @@ public class TestClusterNodeTracker {
|
|||
4, nodeTracker.nodeCount("rack0"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrCapability() {
|
||||
addEight4x4Nodes();
|
||||
assertEquals("Cluster Capability Memory incorrect",
|
||||
metrics.getCapabilityMB(), (4096 * 8));
|
||||
assertEquals("Cluster Capability Vcores incorrect",
|
||||
metrics.getCapabilityVirtualCores(), 4 * 8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNodesForResourceName() throws Exception {
|
||||
addEight4x4Nodes();
|
||||
|
|
|
@ -219,3 +219,21 @@ The following parameters can be used to configure the container log dir sizes.
|
|||
| `yarn.nodemanager.container-log-monitor.interval-ms` | Positive integer | How often to check the usage of a container's log directories in milliseconds. Default is 60000 ms. |
|
||||
| `yarn.nodemanager.container-log-monitor.dir-size-limit-bytes` | Long | The disk space limit, in bytes, for a single container log directory. Default is 1000000000. |
|
||||
| `yarn.nodemanager.container-log-monitor.total-size-limit-bytes` | Long | The disk space limit, in bytes, for all of a container's logs. The default is 10000000000. |
|
||||
|
||||
Scale Heart-beat Interval Based on CPU Utilization
|
||||
-------------------------------------------------
|
||||
|
||||
This allows a cluster admin to configure a cluster to allow the heart-beat between the Resource Manager and each NodeManager to be scaled based on the CPU utilization of the node compared to the overall CPU utilization of the cluster.
|
||||
|
||||
### Configuration
|
||||
|
||||
The following parameters can be used to configure the heart-beat interval and whether and how it scales.
|
||||
|
||||
| Configuration Name | Allowed Values | Description |
|
||||
|:---- |:---- |:---- |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-ms` | Long | Specifies the default heart-beat interval in milliseconds for every NodeManager in the cluster. Default is 1000 ms. |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable` | true, false | Enables heart-beat interval scaling. If true, The NodeManager heart-beat interval will scale based on the difference between the CPU utilization on the node and the cluster-wide average CPU utilization. Default is false. |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the minimum heart-beat interval in milliseconds. Default is 1000 ms. |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the maximum heart-beat interval in milliseconds. Default is 1000 ms. |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when speeding up heartbeat intervals. At 1.0, 20% less than the average cluster-wide CPU utilization will result in a 20% decrease in the heartbeat interval. Default is 1.0. |
|
||||
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when slowing down heartbeat intervals. At 1.0, 20% greater than the average cluster-wide CPU utilization will result in a 20% increase in the heartbeat interval. Default is 1.0. |
|
||||
|
|
Loading…
Reference in New Issue