diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 69946c88bef..b7d6ef44ca9 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -228,6 +228,13 @@ public class NodeInfo { public Resource getPhysicalResource() { return null; } + + @Override + public long calculateHeartBeatInterval( + long defaultInterval, long minInterval, long maxInterval, + float speedupFactor, float slowdownFactor) { + return defaultInterval; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index a96b7901bfc..346216accf6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -216,4 +216,11 @@ public class RMNodeWrapper implements RMNode { public Resource getPhysicalResource() { return null; } + + @Override + public long calculateHeartBeatInterval( + long defaultInterval, long minInterval, long maxInterval, + float speedupFactor, float slowdownFactor) { + return defaultInterval; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c4b9017543c..a7123ff814e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -667,6 +667,30 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000; + /** Enable Heartbeat Interval Scaling based on cpu utilization. */ + public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = + RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable"; + public static final boolean + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false; + + public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS = + RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms"; + public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000; + + public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS = + RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms"; + public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000; + + public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = + RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor"; + public static final float + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f; + + public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = + RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor"; + public static final float + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f; + /** Number of worker threads that write the history data. */ public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 132e8356ef6..e6904fbc604 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -834,6 +834,56 @@ 1000 + + Enables heart-beat interval scaling. The NodeManager + heart-beat interval will scale based on the difference between the CPU + utilization on the node and the cluster-wide average CPU utilization. + + + yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable + + false + + + + If heart-beat interval scaling is enabled, this is the + minimum heart-beat interval in milliseconds + + yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms + 1000 + + + + If heart-beat interval scaling is enabled, this is the + maximum heart-beat interval in milliseconds + yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms + 1000 + + + + If heart-beat interval scaling is enabled, this controls + the degree of adjustment when speeding up heartbeat intervals. + At 1.0, 20% less than average CPU utilization will result in a 20% + decrease in heartbeat interval. + + + yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor + + 1.0 + + + + If heart-beat interval scaling is enabled, this controls + the degree of adjustment when slowing down heartbeat intervals. + At 1.0, 20% greater than average CPU utilization will result in a 20% + increase in heartbeat interval. + + + yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor + + 1.0 + + The minimum allowed version of a connecting nodemanager. The valid values are NONE (no version checking), EqualToRM (the nodemanager's version is equal to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index b9b57e42cf2..2e78f19184b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -722,6 +722,14 @@ public class AdminService extends CompositeService implements // refresh dynamic resource in ResourceTrackerService this.rm.getRMContext().getResourceTrackerService(). updateDynamicResourceConfiguration(newConf); + + // Update our heartbeat configuration as well + Configuration ysconf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + this.rm.getRMContext().getResourceTrackerService() + .updateHeartBeatConfiguration(ysconf); + RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); return response; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index 77245375c8c..764d80722ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.yarn.api.records.Resource; import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @@ -53,6 +54,8 @@ public class ClusterMetrics { private MutableRate aMContainerAllocationDelay; @Metric("Memory Utilization") MutableGaugeLong utilizedMB; @Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores; + @Metric("Memory Capability") MutableGaugeLong capabilityMB; + @Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores; private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", "Metrics for the Yarn Cluster"); @@ -83,7 +86,7 @@ public class ClusterMetrics { } @VisibleForTesting - synchronized static void destroy() { + public synchronized static void destroy() { isInitialized.set(false); INSTANCE = null; } @@ -195,6 +198,28 @@ public class ClusterMetrics { aMRegisterDelay.add(delay); } + public long getCapabilityMB() { + return capabilityMB.value(); + } + + public long getCapabilityVirtualCores() { + return capabilityVirtualCores.value(); + } + + public void incrCapability(Resource res) { + if (res != null) { + capabilityMB.incr(res.getMemorySize()); + capabilityVirtualCores.incr(res.getVirtualCores()); + } + } + + public void decrCapability(Resource res) { + if (res != null) { + capabilityMB.decr(res.getMemorySize()); + capabilityVirtualCores.decr(res.getVirtualCores()); + } + } + public void addAMContainerAllocationDelay(long delay) { aMContainerAllocationDelay.add(delay); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index ce39c43fcb1..5d691bebaed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -110,6 +109,13 @@ public class ResourceTrackerService extends AbstractService implements private final WriteLock writeLock; private long nextHeartBeatInterval; + private boolean heartBeatIntervalScalingEnable; + private long heartBeatIntervalMin; + private long heartBeatIntervalMax; + private float heartBeatIntervalSpeedupFactor; + private float heartBeatIntervalSlowdownFactor; + + private Server server; private InetSocketAddress resourceTrackerAddress; private String minimumNodeManagerVersion; @@ -152,14 +158,6 @@ public class ResourceTrackerService extends AbstractService implements YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); RackResolver.init(conf); - nextHeartBeatInterval = - conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); - if (nextHeartBeatInterval <= 0) { - throw new YarnRuntimeException("Invalid Configuration. " - + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS - + " should be larger than 0."); - } minAllocMb = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -180,7 +178,7 @@ public class ResourceTrackerService extends AbstractService implements isDelegatedCentralizedNodeLabelsConf = YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); } - + updateHeartBeatConfiguration(conf); loadDynamicResourceConfiguration(conf); decommissioningWatcher.init(conf); super.serviceInit(conf); @@ -225,6 +223,84 @@ public class ResourceTrackerService extends AbstractService implements } } + /** + * Update HearBeatConfiguration with new configuration. + * @param conf Yarn Configuration + */ + public void updateHeartBeatConfiguration(Configuration conf) { + this.writeLock.lock(); + try { + nextHeartBeatInterval = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + heartBeatIntervalScalingEnable = + conf.getBoolean( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE, + YarnConfiguration. + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE); + heartBeatIntervalMin = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS); + heartBeatIntervalMax = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS); + heartBeatIntervalSpeedupFactor = + conf.getFloat( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR, + YarnConfiguration. + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR); + heartBeatIntervalSlowdownFactor = + conf.getFloat( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR, + YarnConfiguration. + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR); + + if (nextHeartBeatInterval <= 0) { + LOG.warn("HeartBeat interval: " + nextHeartBeatInterval + + " must be greater than 0, using default."); + nextHeartBeatInterval = + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS; + } + + if (heartBeatIntervalScalingEnable) { + if (heartBeatIntervalMin <= 0 + || heartBeatIntervalMin > heartBeatIntervalMax + || nextHeartBeatInterval < heartBeatIntervalMin + || nextHeartBeatInterval > heartBeatIntervalMax) { + LOG.warn("Invalid NM Heartbeat Configuration. " + + "Required: 0 < minimum <= interval <= maximum. Got: 0 < " + + heartBeatIntervalMin + " <= " + + nextHeartBeatInterval + " <= " + + heartBeatIntervalMax + + " Setting min and max to configured interval."); + heartBeatIntervalMin = nextHeartBeatInterval; + heartBeatIntervalMax = nextHeartBeatInterval; + } + if (heartBeatIntervalSpeedupFactor < 0 + || heartBeatIntervalSlowdownFactor < 0) { + LOG.warn( + "Heartbeat scaling factors must be >= 0 " + + " SpeedupFactor:" + heartBeatIntervalSpeedupFactor + + " SlowdownFactor:" + heartBeatIntervalSlowdownFactor + + ". Using Defaults"); + heartBeatIntervalSlowdownFactor = + YarnConfiguration. + DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR; + heartBeatIntervalSpeedupFactor = + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR; + } + LOG.info("Heartbeat Scaling Configuration: " + + " defaultInterval:" + nextHeartBeatInterval + + " minimumInterval:" + heartBeatIntervalMin + + " maximumInterval:" + heartBeatIntervalMax + + " speedupFactor:" + heartBeatIntervalSpeedupFactor + + " slowdownFactor:" + heartBeatIntervalSlowdownFactor); + } + } finally { + this.writeLock.unlock(); + } + } + @Override protected void serviceStart() throws Exception { super.serviceStart(); @@ -588,10 +664,17 @@ public class ResourceTrackerService extends AbstractService implements } // Heartbeat response + long newInterval = nextHeartBeatInterval; + if (heartBeatIntervalScalingEnable) { + newInterval = rmNode.calculateHeartBeatInterval( + nextHeartBeatInterval, heartBeatIntervalMin, + heartBeatIntervalMax, heartBeatIntervalSpeedupFactor, + heartBeatIntervalSlowdownFactor); + } NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse( getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), - NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); + NodeAction.NORMAL, null, null, null, null, newInterval); rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 68a780e3d6e..fd85e4f31f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -195,4 +195,8 @@ public interface RMNode { * @return the RM context associated with this RM node. */ RMContext getRMContext(); + + long calculateHeartBeatInterval(long defaultInterval, + long minInterval, long maxInterval, float speedupFactor, + float slowdownFactor); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 77af132e1fc..a2534616e84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -690,6 +690,48 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + @Override + public long calculateHeartBeatInterval(long defaultInterval, long minInterval, + long maxInterval, float speedupFactor, float slowdownFactor) { + + long newInterval = defaultInterval; + + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + float clusterUtil = metrics.getUtilizedVirtualCores() + / Math.max(1.0f, metrics.getCapabilityVirtualCores()); + + if (this.nodeUtilization != null && this.getPhysicalResource() != null) { + // getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on + // a physicalResource returns number of physical cores. So, + // nodeUtil will be CPU utilization of entire node. + float nodeUtil = this.nodeUtilization.getCPU() + / Math.max(1.0f, this.getPhysicalResource().getVirtualCores()); + + // sanitize + nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil)); + clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil)); + + if (nodeUtil > clusterUtil) { + // Slow down - 20% more CPU utilization means slow down by 20% * factor + newInterval = (long) (defaultInterval + * (1.0f + (nodeUtil - clusterUtil) * slowdownFactor)); + } else { + // Speed up - 20% less CPU utilization means speed up by 20% * factor + newInterval = (long) (defaultInterval + * (1.0f - (clusterUtil - nodeUtil) * speedupFactor)); + } + newInterval = + Math.min(maxInterval, Math.max(minInterval, newInterval)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Setting heartbeatinterval to: " + newInterval + + " node:" + this.nodeId + " nodeUtil: " + nodeUtil + + " clusterUtil: " + clusterUtil); + } + } + return newInterval; + } + public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 704893a04a2..661d829b292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -93,6 +94,7 @@ public class ClusterNodeTracker { // Update cluster capacity Resources.addTo(clusterCapacity, node.getTotalResource()); staleClusterCapacity = Resources.clone(clusterCapacity); + ClusterMetrics.getMetrics().incrCapability(node.getTotalResource()); // Update maximumAllocation updateMaxResources(node, true); @@ -178,6 +180,7 @@ public class ClusterNodeTracker { // Update cluster capacity Resources.subtractFrom(clusterCapacity, node.getTotalResource()); staleClusterCapacity = Resources.clone(clusterCapacity); + ClusterMetrics.getMetrics().decrCapability(node.getTotalResource()); // Update maximumAllocation updateMaxResources(node, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 36fb3201134..dc9d27a8073 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -301,6 +301,13 @@ public class MockNodes { public Resource getPhysicalResource() { return this.physicalResource; } + + @Override + public long calculateHeartBeatInterval( + long defaultInterval, long minInterval, long maxInterval, + float speedupFactor, float slowdownFactor) { + return defaultInterval; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 487d226072d..5b84b9e8985 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -1104,4 +1105,112 @@ public class TestRMNodeTransitions { Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); Assert.assertEquals(0, node.getCompletedContainers().size()); } + + private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil, + long hbDefault, long hbMin, long hbMax, float speedup, float slowdown, + float cpuUtil, long expectedHb) { + nodeUtil.setCPU(cpuUtil); + rmNode.setNodeUtilization(nodeUtil); + long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax, + speedup, slowdown); + assertEquals("heartbeat interval incorrect", expectedHb, hbInterval); + } + + @Test + public void testCalculateHeartBeatInterval() { + RMNodeImpl rmNode = getRunningNode(); + Resource nodeCapability = rmNode.getTotalCapability(); + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + // Set cluster capability to 10 * nodeCapability + int vcoreUnit = nodeCapability.getVirtualCores(); + rmNode.setPhysicalResource(nodeCapability); + int clusterVcores = vcoreUnit * 10; + metrics.incrCapability( + Resource.newInstance(10 * nodeCapability.getMemorySize(), + clusterVcores)); + + long hbDefault = 2000; + long hbMin = 1500; + long hbMax = 2500; + float speedup = 1.0F; + float slowdown = 1.0F; + metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util + ResourceUtilization nodeUtil = ResourceUtilization.newInstance( + 1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100% + + // Try with 50% speedup/slowdown factors + speedup = 0.5F; + slowdown = 0.5F; + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90% + + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100% + + // With Physical Resource null, it should always return default + rmNode.setPhysicalResource(null); + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10% + calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax, + speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100% + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java index c1703bc52e3..14eca5ae5e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java @@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -40,12 +42,19 @@ import static org.junit.Assert.assertEquals; */ public class TestClusterNodeTracker { private ClusterNodeTracker nodeTracker; + private ClusterMetrics metrics; @Before public void setup() { + metrics = ClusterMetrics.getMetrics(); nodeTracker = new ClusterNodeTracker<>(); } + @After + public void teardown() { + ClusterMetrics.destroy(); + } + private void addEight4x4Nodes() { MockNodes.resetHostIds(); List rmNodes = @@ -65,6 +74,15 @@ public class TestClusterNodeTracker { 4, nodeTracker.nodeCount("rack0")); } + @Test + public void testIncrCapability() { + addEight4x4Nodes(); + assertEquals("Cluster Capability Memory incorrect", + metrics.getCapabilityMB(), (4096 * 8)); + assertEquals("Cluster Capability Vcores incorrect", + metrics.getCapabilityVirtualCores(), 4 * 8); + } + @Test public void testGetNodesForResourceName() throws Exception { addEight4x4Nodes(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md index dd7a1957f04..5e643a69b8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md @@ -146,4 +146,22 @@ The following settings need to be set in *yarn-site.xml*. yarn.nodemanager.aux-services.mapreduce_shuffle.class org.apache.hadoop.mapred.ShuffleHandler - \ No newline at end of file + + +Scale Heart-beat Interval Based on CPU Utilization +------------------------------------------------- + +This allows a cluster admin to configure a cluster to allow the heart-beat between the Resource Manager and each NodeManager to be scaled based on the CPU utilization of the node compared to the overall CPU utilization of the cluster. + +### Configuration + +The following parameters can be used to configure the heart-beat interval and whether and how it scales. + +| Configuration Name | Allowed Values | Description | +|:---- |:---- |:---- | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-ms` | Long | Specifies the default heart-beat interval in milliseconds for every NodeManager in the cluster. Default is 1000 ms. | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable` | true, false | Enables heart-beat interval scaling. If true, The NodeManager heart-beat interval will scale based on the difference between the CPU utilization on the node and the cluster-wide average CPU utilization. Default is false. | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the minimum heart-beat interval in milliseconds. Default is 1000 ms. | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the maximum heart-beat interval in milliseconds. Default is 1000 ms. | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when speeding up heartbeat intervals. At 1.0, 20% less than the average cluster-wide CPU utilization will result in a 20% decrease in the heartbeat interval. Default is 1.0. | +| `yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when slowing down heartbeat intervals. At 1.0, 20% greater than the average cluster-wide CPU utilization will result in a 20% increase in the heartbeat interval. Default is 1.0. |