YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).

(cherry picked from commit 31154fdde5)
This commit is contained in:
Eric E Payne 2020-11-02 17:33:57 +00:00
parent 42fab7897a
commit 052b9799c0
14 changed files with 418 additions and 13 deletions

View File

@ -228,6 +228,13 @@ public class NodeInfo {
public Resource getPhysicalResource() {
return null;
}
@Override
public long calculateHeartBeatInterval(
long defaultInterval, long minInterval, long maxInterval,
float speedupFactor, float slowdownFactor) {
return defaultInterval;
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -216,4 +216,11 @@ public class RMNodeWrapper implements RMNode {
public Resource getPhysicalResource() {
return null;
}
@Override
public long calculateHeartBeatInterval(
long defaultInterval, long minInterval, long maxInterval,
float speedupFactor, float slowdownFactor) {
return defaultInterval;
}
}

View File

@ -667,6 +667,30 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
/** Enable Heartbeat Interval Scaling based on cpu utilization. */
public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
public static final boolean
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
public static final float
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
public static final float
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
/** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";

View File

@ -834,6 +834,56 @@
<value>1000</value>
</property>
<property>
<description>Enables heart-beat interval scaling. The NodeManager
heart-beat interval will scale based on the difference between the CPU
utilization on the node and the cluster-wide average CPU utilization.
</description>
<name>
yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
</name>
<value>false</value>
</property>
<property>
<description>If heart-beat interval scaling is enabled, this is the
minimum heart-beat interval in milliseconds
</description>
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms</name>
<value>1000</value>
</property>
<property>
<description>If heart-beat interval scaling is enabled, this is the
maximum heart-beat interval in milliseconds</description>
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms</name>
<value>1000</value>
</property>
<property>
<description>If heart-beat interval scaling is enabled, this controls
the degree of adjustment when speeding up heartbeat intervals.
At 1.0, 20% less than average CPU utilization will result in a 20%
decrease in heartbeat interval.
</description>
<name>
yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
</name>
<value>1.0</value>
</property>
<property>
<description>If heart-beat interval scaling is enabled, this controls
the degree of adjustment when slowing down heartbeat intervals.
At 1.0, 20% greater than average CPU utilization will result in a 20%
increase in heartbeat interval.
</description>
<name>
yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
</name>
<value>1.0</value>
</property>
<property>
<description>The minimum allowed version of a connecting nodemanager. The valid values are
NONE (no version checking), EqualToRM (the nodemanager's version is equal to

View File

@ -722,6 +722,14 @@ public class AdminService extends CompositeService implements
// refresh dynamic resource in ResourceTrackerService
this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
// Update our heartbeat configuration as well
Configuration ysconf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
this.rm.getRMContext().getResourceTrackerService()
.updateHeartBeatConfiguration(ysconf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return response;

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.Resource;
import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@ -53,6 +54,8 @@ public class ClusterMetrics {
private MutableRate aMContainerAllocationDelay;
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
"Metrics for the Yarn Cluster");
@ -83,7 +86,7 @@ public class ClusterMetrics {
}
@VisibleForTesting
synchronized static void destroy() {
public synchronized static void destroy() {
isInitialized.set(false);
INSTANCE = null;
}
@ -195,6 +198,28 @@ public class ClusterMetrics {
aMRegisterDelay.add(delay);
}
public long getCapabilityMB() {
return capabilityMB.value();
}
public long getCapabilityVirtualCores() {
return capabilityVirtualCores.value();
}
public void incrCapability(Resource res) {
if (res != null) {
capabilityMB.incr(res.getMemorySize());
capabilityVirtualCores.incr(res.getVirtualCores());
}
}
public void decrCapability(Resource res) {
if (res != null) {
capabilityMB.decr(res.getMemorySize());
capabilityVirtualCores.decr(res.getVirtualCores());
}
}
public void addAMContainerAllocationDelay(long delay) {
aMContainerAllocationDelay.add(delay);
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -110,6 +109,13 @@ public class ResourceTrackerService extends AbstractService implements
private final WriteLock writeLock;
private long nextHeartBeatInterval;
private boolean heartBeatIntervalScalingEnable;
private long heartBeatIntervalMin;
private long heartBeatIntervalMax;
private float heartBeatIntervalSpeedupFactor;
private float heartBeatIntervalSlowdownFactor;
private Server server;
private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion;
@ -152,14 +158,6 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (nextHeartBeatInterval <= 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@ -180,7 +178,7 @@ public class ResourceTrackerService extends AbstractService implements
isDelegatedCentralizedNodeLabelsConf =
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
}
updateHeartBeatConfiguration(conf);
loadDynamicResourceConfiguration(conf);
decommissioningWatcher.init(conf);
super.serviceInit(conf);
@ -225,6 +223,84 @@ public class ResourceTrackerService extends AbstractService implements
}
}
/**
* Update HearBeatConfiguration with new configuration.
* @param conf Yarn Configuration
*/
public void updateHeartBeatConfiguration(Configuration conf) {
this.writeLock.lock();
try {
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
heartBeatIntervalScalingEnable =
conf.getBoolean(
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
YarnConfiguration.
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
heartBeatIntervalMin =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
heartBeatIntervalMax =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
heartBeatIntervalSpeedupFactor =
conf.getFloat(
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
YarnConfiguration.
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
heartBeatIntervalSlowdownFactor =
conf.getFloat(
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
YarnConfiguration.
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
if (nextHeartBeatInterval <= 0) {
LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
+ " must be greater than 0, using default.");
nextHeartBeatInterval =
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
}
if (heartBeatIntervalScalingEnable) {
if (heartBeatIntervalMin <= 0
|| heartBeatIntervalMin > heartBeatIntervalMax
|| nextHeartBeatInterval < heartBeatIntervalMin
|| nextHeartBeatInterval > heartBeatIntervalMax) {
LOG.warn("Invalid NM Heartbeat Configuration. "
+ "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
+ heartBeatIntervalMin + " <= "
+ nextHeartBeatInterval + " <= "
+ heartBeatIntervalMax
+ " Setting min and max to configured interval.");
heartBeatIntervalMin = nextHeartBeatInterval;
heartBeatIntervalMax = nextHeartBeatInterval;
}
if (heartBeatIntervalSpeedupFactor < 0
|| heartBeatIntervalSlowdownFactor < 0) {
LOG.warn(
"Heartbeat scaling factors must be >= 0 "
+ " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
+ " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
+ ". Using Defaults");
heartBeatIntervalSlowdownFactor =
YarnConfiguration.
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
heartBeatIntervalSpeedupFactor =
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
}
LOG.info("Heartbeat Scaling Configuration: "
+ " defaultInterval:" + nextHeartBeatInterval
+ " minimumInterval:" + heartBeatIntervalMin
+ " maximumInterval:" + heartBeatIntervalMax
+ " speedupFactor:" + heartBeatIntervalSpeedupFactor
+ " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
}
} finally {
this.writeLock.unlock();
}
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
@ -588,10 +664,17 @@ public class ResourceTrackerService extends AbstractService implements
}
// Heartbeat response
long newInterval = nextHeartBeatInterval;
if (heartBeatIntervalScalingEnable) {
newInterval = rmNode.calculateHeartBeatInterval(
nextHeartBeatInterval, heartBeatIntervalMin,
heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
heartBeatIntervalSlowdownFactor);
}
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
NodeAction.NORMAL, null, null, null, null, newInterval);
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);

View File

@ -195,4 +195,8 @@ public interface RMNode {
* @return the RM context associated with this RM node.
*/
RMContext getRMContext();
long calculateHeartBeatInterval(long defaultInterval,
long minInterval, long maxInterval, float speedupFactor,
float slowdownFactor);
}

View File

@ -690,6 +690,48 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
@Override
public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
long maxInterval, float speedupFactor, float slowdownFactor) {
long newInterval = defaultInterval;
ClusterMetrics metrics = ClusterMetrics.getMetrics();
float clusterUtil = metrics.getUtilizedVirtualCores()
/ Math.max(1.0f, metrics.getCapabilityVirtualCores());
if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
// getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
// a physicalResource returns number of physical cores. So,
// nodeUtil will be CPU utilization of entire node.
float nodeUtil = this.nodeUtilization.getCPU()
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
// sanitize
nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
if (nodeUtil > clusterUtil) {
// Slow down - 20% more CPU utilization means slow down by 20% * factor
newInterval = (long) (defaultInterval
* (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
} else {
// Speed up - 20% less CPU utilization means speed up by 20% * factor
newInterval = (long) (defaultInterval
* (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
}
newInterval =
Math.min(maxInterval, Math.max(minInterval, newInterval));
if (LOG.isDebugEnabled()) {
LOG.debug("Setting heartbeatinterval to: " + newInterval
+ " node:" + this.nodeId + " nodeUtil: " + nodeUtil
+ " clusterUtil: " + clusterUtil);
}
}
return newInterval;
}
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -93,6 +94,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
// Update cluster capacity
Resources.addTo(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity);
ClusterMetrics.getMetrics().incrCapability(node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, true);
@ -178,6 +180,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
// Update cluster capacity
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity);
ClusterMetrics.getMetrics().decrCapability(node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, false);

View File

@ -301,6 +301,13 @@ public class MockNodes {
public Resource getPhysicalResource() {
return this.physicalResource;
}
@Override
public long calculateHeartBeatInterval(
long defaultInterval, long minInterval, long maxInterval,
float speedupFactor, float slowdownFactor) {
return defaultInterval;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -1104,4 +1105,112 @@ public class TestRMNodeTransitions {
Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
Assert.assertEquals(0, node.getCompletedContainers().size());
}
private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil,
long hbDefault, long hbMin, long hbMax, float speedup, float slowdown,
float cpuUtil, long expectedHb) {
nodeUtil.setCPU(cpuUtil);
rmNode.setNodeUtilization(nodeUtil);
long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax,
speedup, slowdown);
assertEquals("heartbeat interval incorrect", expectedHb, hbInterval);
}
@Test
public void testCalculateHeartBeatInterval() {
RMNodeImpl rmNode = getRunningNode();
Resource nodeCapability = rmNode.getTotalCapability();
ClusterMetrics metrics = ClusterMetrics.getMetrics();
// Set cluster capability to 10 * nodeCapability
int vcoreUnit = nodeCapability.getVirtualCores();
rmNode.setPhysicalResource(nodeCapability);
int clusterVcores = vcoreUnit * 10;
metrics.incrCapability(
Resource.newInstance(10 * nodeCapability.getMemorySize(),
clusterVcores));
long hbDefault = 2000;
long hbMin = 1500;
long hbMax = 2500;
float speedup = 1.0F;
float slowdown = 1.0F;
metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util
ResourceUtilization nodeUtil = ResourceUtilization.newInstance(
1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
// Try with 50% speedup/slowdown factors
speedup = 0.5F;
slowdown = 0.5F;
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
// With Physical Resource null, it should always return default
rmNode.setPhysicalResource(null);
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10%
calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100%
}
}

View File

@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -40,12 +42,19 @@ import static org.junit.Assert.assertEquals;
*/
public class TestClusterNodeTracker {
private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
private ClusterMetrics metrics;
@Before
public void setup() {
metrics = ClusterMetrics.getMetrics();
nodeTracker = new ClusterNodeTracker<>();
}
@After
public void teardown() {
ClusterMetrics.destroy();
}
private void addEight4x4Nodes() {
MockNodes.resetHostIds();
List<RMNode> rmNodes =
@ -65,6 +74,15 @@ public class TestClusterNodeTracker {
4, nodeTracker.nodeCount("rack0"));
}
@Test
public void testIncrCapability() {
addEight4x4Nodes();
assertEquals("Cluster Capability Memory incorrect",
metrics.getCapabilityMB(), (4096 * 8));
assertEquals("Cluster Capability Vcores incorrect",
metrics.getCapabilityVirtualCores(), 4 * 8);
}
@Test
public void testGetNodesForResourceName() throws Exception {
addEight4x4Nodes();

View File

@ -146,4 +146,22 @@ The following settings need to be set in *yarn-site.xml*.
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</property>
Scale Heart-beat Interval Based on CPU Utilization
-------------------------------------------------
This allows a cluster admin to configure a cluster to allow the heart-beat between the Resource Manager and each NodeManager to be scaled based on the CPU utilization of the node compared to the overall CPU utilization of the cluster.
### Configuration
The following parameters can be used to configure the heart-beat interval and whether and how it scales.
| Configuration Name | Allowed Values | Description |
|:---- |:---- |:---- |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-ms` | Long | Specifies the default heart-beat interval in milliseconds for every NodeManager in the cluster. Default is 1000 ms. |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable` | true, false | Enables heart-beat interval scaling. If true, The NodeManager heart-beat interval will scale based on the difference between the CPU utilization on the node and the cluster-wide average CPU utilization. Default is false. |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the minimum heart-beat interval in milliseconds. Default is 1000 ms. |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the maximum heart-beat interval in milliseconds. Default is 1000 ms. |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when speeding up heartbeat intervals. At 1.0, 20% less than the average cluster-wide CPU utilization will result in a 20% decrease in the heartbeat interval. Default is 1.0. |
| `yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when slowing down heartbeat intervals. At 1.0, 20% greater than the average cluster-wide CPU utilization will result in a 20% increase in the heartbeat interval. Default is 1.0. |