YARN-9322. Store metrics for custom resource types into FSQueueMetrics and query them in FairSchedulerQueueInfo
(Contributed by Szilard Nemeth via Daniel Templeton) Change-Id: I14c12f1265999d62102f2ec5506d90015efeefe8
This commit is contained in:
parent
1779fc57a1
commit
7b928f19a4
|
@ -0,0 +1,113 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is a main entry-point for any kind of metrics for
|
||||||
|
* custom resources.
|
||||||
|
* It provides increase and decrease methods for all types of metrics.
|
||||||
|
*/
|
||||||
|
public class FSQueueMetricsForCustomResources {
|
||||||
|
private final QueueMetricsCustomResource fairShare =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource steadyFairShare =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource minShare =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource maxShare =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource maxAMShare =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource amResourceUsage =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getFairShare() {
|
||||||
|
return fairShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFairShare(Resource res) {
|
||||||
|
fairShare.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getFairShareValues() {
|
||||||
|
return fairShare.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getSteadyFairShare() {
|
||||||
|
return steadyFairShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSteadyFairShare(Resource res) {
|
||||||
|
steadyFairShare.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getSteadyFairShareValues() {
|
||||||
|
return steadyFairShare.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getMinShare() {
|
||||||
|
return minShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMinShare(Resource res) {
|
||||||
|
minShare.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getMinShareValues() {
|
||||||
|
return minShare.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getMaxShare() {
|
||||||
|
return maxShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxShare(Resource res) {
|
||||||
|
maxShare.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getMaxShareValues() {
|
||||||
|
return maxShare.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getMaxAMShare() {
|
||||||
|
return maxAMShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxAMShare(Resource res) {
|
||||||
|
maxAMShare.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getMaxAMShareValues() {
|
||||||
|
return maxAMShare.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueMetricsCustomResource getAMResourceUsage() {
|
||||||
|
return amResourceUsage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAMResourceUsage(Resource res) {
|
||||||
|
amResourceUsage.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getAMResourceUsageValues() {
|
||||||
|
return amResourceUsage.getValues();
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,8 +46,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
||||||
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that holds metrics values for custom resources in a map keyed with
|
||||||
|
* the name of the custom resource.
|
||||||
|
* There are different kinds of values like allocated, available and others.
|
||||||
|
*/
|
||||||
|
public class QueueMetricsCustomResource {
|
||||||
|
private final Map<String, Long> values = Maps.newHashMap();
|
||||||
|
|
||||||
|
protected void increase(Resource res) {
|
||||||
|
update(res, Long::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
void increaseWithMultiplier(Resource res, long multiplier) {
|
||||||
|
update(res, (v1, v2) -> v1 + v2 * multiplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decrease(Resource res) {
|
||||||
|
update(res, (v1, v2) -> v1 - v2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void decreaseWithMultiplier(Resource res, int containers) {
|
||||||
|
update(res, (v1, v2) -> v1 - v2 * containers);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void set(Resource res) {
|
||||||
|
update(res, (v1, v2) -> v2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void update(Resource res, BiFunction<Long, Long, Long> operation) {
|
||||||
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||||
|
ResourceInformation[] resources = res.getResources();
|
||||||
|
|
||||||
|
for (int i = 2; i < resources.length; i++) {
|
||||||
|
ResourceInformation resource = resources[i];
|
||||||
|
|
||||||
|
// Map.merge only applies operation if there is
|
||||||
|
// a value for the key in the map
|
||||||
|
if (!values.containsKey(resource.getName())) {
|
||||||
|
values.put(resource.getName(), 0L);
|
||||||
|
}
|
||||||
|
values.merge(resource.getName(),
|
||||||
|
resource.getValue(), operation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getValues() {
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,13 +16,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.BiFunction;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is a main entry-point for any kind of metrics for
|
* This class is a main entry-point for any kind of metrics for
|
||||||
|
@ -30,56 +26,6 @@ import java.util.function.BiFunction;
|
||||||
* It provides increase and decrease methods for all types of metrics.
|
* It provides increase and decrease methods for all types of metrics.
|
||||||
*/
|
*/
|
||||||
public class QueueMetricsForCustomResources {
|
public class QueueMetricsForCustomResources {
|
||||||
/**
|
|
||||||
* Class that holds metrics values for custom resources in a map keyed with
|
|
||||||
* the name of the custom resource.
|
|
||||||
* There are different kinds of values like allocated, available and others.
|
|
||||||
*/
|
|
||||||
public static class QueueMetricsCustomResource {
|
|
||||||
private final Map<String, Long> values = Maps.newHashMap();
|
|
||||||
|
|
||||||
protected void increase(Resource res) {
|
|
||||||
update(res, Long::sum);
|
|
||||||
}
|
|
||||||
|
|
||||||
void increaseWithMultiplier(Resource res, long multiplier) {
|
|
||||||
update(res, (v1, v2) -> v1 + v2 * multiplier);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void decrease(Resource res) {
|
|
||||||
update(res, (v1, v2) -> v1 - v2);
|
|
||||||
}
|
|
||||||
|
|
||||||
void decreaseWithMultiplier(Resource res, int containers) {
|
|
||||||
update(res, (v1, v2) -> v1 - v2 * containers);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void set(Resource res) {
|
|
||||||
update(res, (v1, v2) -> v2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void update(Resource res, BiFunction<Long, Long, Long> operation) {
|
|
||||||
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
|
||||||
ResourceInformation[] resources = res.getResources();
|
|
||||||
|
|
||||||
for (int i = 2; i < resources.length; i++) {
|
|
||||||
ResourceInformation resource = resources[i];
|
|
||||||
|
|
||||||
// Map.merge only applies operation if there is
|
|
||||||
// a value for the key in the map
|
|
||||||
if (!values.containsKey(resource.getName())) {
|
|
||||||
values.put(resource.getName(), 0L);
|
|
||||||
}
|
|
||||||
values.merge(resource.getName(),
|
|
||||||
resource.getValue(), operation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, Long> getValues() {
|
|
||||||
return values;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private final QueueMetricsCustomResource aggregatePreemptedSeconds =
|
private final QueueMetricsCustomResource aggregatePreemptedSeconds =
|
||||||
new QueueMetricsCustomResource();
|
new QueueMetricsCustomResource();
|
||||||
private final QueueMetricsCustomResource allocated =
|
private final QueueMetricsCustomResource allocated =
|
||||||
|
|
|
@ -27,8 +27,10 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.FSQueueMetricsForCustomResources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
@Metrics(context="yarn")
|
@Metrics(context="yarn")
|
||||||
public class FSQueueMetrics extends QueueMetrics {
|
public class FSQueueMetrics extends QueueMetrics {
|
||||||
|
@ -47,29 +49,45 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
@Metric("AM resource usage of memory in MB") MutableGaugeLong amResourceUsageMB;
|
@Metric("AM resource usage of memory in MB") MutableGaugeLong amResourceUsageMB;
|
||||||
@Metric("AM resource usage of CPU in vcores") MutableGaugeInt amResourceUsageVCores;
|
@Metric("AM resource usage of CPU in vcores") MutableGaugeInt amResourceUsageVCores;
|
||||||
|
|
||||||
|
private final FSQueueMetricsForCustomResources customResources;
|
||||||
private String schedulingPolicy;
|
private String schedulingPolicy;
|
||||||
|
|
||||||
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||||
boolean enableUserMetrics, Configuration conf) {
|
boolean enableUserMetrics, Configuration conf) {
|
||||||
super(ms, queueName, parent, enableUserMetrics, conf);
|
super(ms, queueName, parent, enableUserMetrics, conf);
|
||||||
|
|
||||||
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||||
|
this.customResources =
|
||||||
|
new FSQueueMetricsForCustomResources();
|
||||||
|
} else {
|
||||||
|
this.customResources = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long getFairShareMB() {
|
||||||
|
return fairShareMB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
long getFairShareVirtualCores() {
|
||||||
|
return fairShareVCores.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getFairShare() {
|
||||||
|
if (customResources != null) {
|
||||||
|
return Resource.newInstance(fairShareMB.value(),
|
||||||
|
(int) fairShareVCores.value(),
|
||||||
|
customResources.getFairShareValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(fairShareMB.value(),
|
||||||
|
(int) fairShareVCores.value());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFairShare(Resource resource) {
|
public void setFairShare(Resource resource) {
|
||||||
fairShareMB.set(resource.getMemorySize());
|
fairShareMB.set(resource.getMemorySize());
|
||||||
fairShareVCores.set(resource.getVirtualCores());
|
fairShareVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setFairShare(resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getFairShareMB() {
|
|
||||||
return fairShareMB.value();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getFairShareVirtualCores() {
|
|
||||||
return fairShareVCores.value();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setSteadyFairShare(Resource resource) {
|
|
||||||
steadyFairShareMB.set(resource.getMemorySize());
|
|
||||||
steadyFairShareVCores.set(resource.getVirtualCores());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getSteadyFairShareMB() {
|
public long getSteadyFairShareMB() {
|
||||||
|
@ -80,9 +98,22 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
return steadyFairShareVCores.value();
|
return steadyFairShareVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMinShare(Resource resource) {
|
public Resource getSteadyFairShare() {
|
||||||
minShareMB.set(resource.getMemorySize());
|
if (customResources != null) {
|
||||||
minShareVCores.set(resource.getVirtualCores());
|
return Resource.newInstance(steadyFairShareMB.value(),
|
||||||
|
(int) steadyFairShareVCores.value(),
|
||||||
|
customResources.getSteadyFairShareValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(steadyFairShareMB.value(),
|
||||||
|
(int) steadyFairShareVCores.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSteadyFairShare(Resource resource) {
|
||||||
|
steadyFairShareMB.set(resource.getMemorySize());
|
||||||
|
steadyFairShareVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setSteadyFairShare(resource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMinShareMB() {
|
public long getMinShareMB() {
|
||||||
|
@ -93,9 +124,22 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
return minShareVCores.value();
|
return minShareVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaxShare(Resource resource) {
|
public Resource getMinShare() {
|
||||||
maxShareMB.set(resource.getMemorySize());
|
if (customResources != null) {
|
||||||
maxShareVCores.set(resource.getVirtualCores());
|
return Resource.newInstance(minShareMB.value(),
|
||||||
|
(int) minShareVCores.value(),
|
||||||
|
customResources.getMinShareValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(minShareMB.value(),
|
||||||
|
(int) minShareVCores.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMinShare(Resource resource) {
|
||||||
|
minShareMB.set(resource.getMemorySize());
|
||||||
|
minShareVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setMinShare(resource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMaxShareMB() {
|
public long getMaxShareMB() {
|
||||||
|
@ -106,6 +150,24 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
return maxShareVCores.value();
|
return maxShareVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getMaxShare() {
|
||||||
|
if (customResources != null) {
|
||||||
|
return Resource.newInstance(maxShareMB.value(),
|
||||||
|
(int) maxShareVCores.value(),
|
||||||
|
customResources.getMaxShareValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(maxShareMB.value(),
|
||||||
|
(int) maxShareVCores.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxShare(Resource resource) {
|
||||||
|
maxShareMB.set(resource.getMemorySize());
|
||||||
|
maxShareVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setMaxShare(resource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public int getMaxApps() {
|
public int getMaxApps() {
|
||||||
return maxApps.value();
|
return maxApps.value();
|
||||||
}
|
}
|
||||||
|
@ -132,6 +194,16 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
return maxAMShareVCores.value();
|
return maxAMShareVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getMaxAMShare() {
|
||||||
|
if (customResources != null) {
|
||||||
|
return Resource.newInstance(maxAMShareMB.value(),
|
||||||
|
maxAMShareVCores.value(),
|
||||||
|
customResources.getMaxAMShareValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(maxAMShareMB.value(),
|
||||||
|
maxAMShareVCores.value());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the maximum resource AM can use.
|
* Set the maximum resource AM can use.
|
||||||
*
|
*
|
||||||
|
@ -140,6 +212,9 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
public void setMaxAMShare(Resource resource) {
|
public void setMaxAMShare(Resource resource) {
|
||||||
maxAMShareMB.set(resource.getMemorySize());
|
maxAMShareMB.set(resource.getMemorySize());
|
||||||
maxAMShareVCores.set(resource.getVirtualCores());
|
maxAMShareVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setMaxAMShare(resource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -160,6 +235,16 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
return amResourceUsageVCores.value();
|
return amResourceUsageVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getAMResourceUsage() {
|
||||||
|
if (customResources != null) {
|
||||||
|
return Resource.newInstance(amResourceUsageMB.value(),
|
||||||
|
amResourceUsageVCores.value(),
|
||||||
|
customResources.getAMResourceUsageValues());
|
||||||
|
}
|
||||||
|
return Resource.newInstance(amResourceUsageMB.value(),
|
||||||
|
amResourceUsageVCores.value());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the AM resource usage.
|
* Set the AM resource usage.
|
||||||
*
|
*
|
||||||
|
@ -168,6 +253,9 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
public void setAMResourceUsage(Resource resource) {
|
public void setAMResourceUsage(Resource resource) {
|
||||||
amResourceUsageMB.set(resource.getMemorySize());
|
amResourceUsageMB.set(resource.getMemorySize());
|
||||||
amResourceUsageVCores.set(resource.getVirtualCores());
|
amResourceUsageVCores.set(resource.getVirtualCores());
|
||||||
|
if (customResources != null) {
|
||||||
|
customResources.setAMResourceUsage(resource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,8 +27,6 @@ import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
import javax.xml.bind.annotation.XmlRootElement;
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
import javax.xml.bind.annotation.XmlSeeAlso;
|
import javax.xml.bind.annotation.XmlSeeAlso;
|
||||||
import javax.xml.bind.annotation.XmlTransient;
|
import javax.xml.bind.annotation.XmlTransient;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
@ -83,12 +81,8 @@ public class FairSchedulerQueueInfo {
|
||||||
|
|
||||||
clusterResources = new ResourceInfo(scheduler.getClusterResource());
|
clusterResources = new ResourceInfo(scheduler.getClusterResource());
|
||||||
|
|
||||||
amUsedResources = new ResourceInfo(Resource.newInstance(
|
amUsedResources = new ResourceInfo(queue.getMetrics().getAMResourceUsage());
|
||||||
queue.getMetrics().getAMResourceUsageMB(),
|
amMaxResources = new ResourceInfo(queue.getMetrics().getMaxAMShare());
|
||||||
queue.getMetrics().getAMResourceUsageVCores()));
|
|
||||||
amMaxResources = new ResourceInfo(Resource.newInstance(
|
|
||||||
queue.getMetrics().getMaxAMShareMB(),
|
|
||||||
queue.getMetrics().getMaxAMShareVCores()));
|
|
||||||
usedResources = new ResourceInfo(queue.getResourceUsage());
|
usedResources = new ResourceInfo(queue.getResourceUsage());
|
||||||
demandResources = new ResourceInfo(queue.getDemand());
|
demandResources = new ResourceInfo(queue.getDemand());
|
||||||
fractionMemUsed = (float)usedResources.getMemorySize() /
|
fractionMemUsed = (float)usedResources.getMemorySize() /
|
||||||
|
|
|
@ -45,7 +45,6 @@ public class ResourceInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceInfo(Resource res) {
|
public ResourceInfo(Resource res) {
|
||||||
// Make sure no NPE.
|
|
||||||
if (res != null) {
|
if (res != null) {
|
||||||
memory = res.getMemorySize();
|
memory = res.getMemorySize();
|
||||||
vCores = res.getVirtualCores();
|
vCores = res.getVirtualCores();
|
||||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
||||||
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
|
@ -18,17 +18,23 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.MetricsSource;
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The test class for {@link FSQueueMetrics}.
|
* The test class for {@link FSQueueMetrics}.
|
||||||
*/
|
*/
|
||||||
|
@ -36,12 +42,26 @@ public class TestFSQueueMetrics {
|
||||||
private static final Configuration CONF = new Configuration();
|
private static final Configuration CONF = new Configuration();
|
||||||
|
|
||||||
private MetricsSystem ms;
|
private MetricsSystem ms;
|
||||||
|
private static final String RESOURCE_NAME = "test1";
|
||||||
|
private static final String QUEUE_NAME = "single";
|
||||||
|
|
||||||
@Before public void setUp() {
|
@Before
|
||||||
|
public void setUp() {
|
||||||
ms = new MetricsSystemImpl();
|
ms = new MetricsSystemImpl();
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FSQueueMetrics setupMetrics(String resourceName) {
|
||||||
|
CONF.set(YarnConfiguration.RESOURCE_TYPES, resourceName);
|
||||||
|
ResourceUtils.resetResourceTypes(CONF);
|
||||||
|
|
||||||
|
return FSQueueMetrics.forQueue(ms, QUEUE_NAME, null, false, CONF);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getErrorMessage(String metricsType) {
|
||||||
|
return metricsType + " is not the expected!";
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if the metric scheduling policy is set correctly.
|
* Test if the metric scheduling policy is set correctly.
|
||||||
*/
|
*/
|
||||||
|
@ -66,4 +86,229 @@ public class TestFSQueueMetrics {
|
||||||
MetricsRecords.assertTag(collector.getRecords().get(0), "SchedulingPolicy",
|
MetricsRecords.assertTag(collector.getRecords().get(0), "SchedulingPolicy",
|
||||||
policy);
|
policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetFairShare() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setFairShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("fairShareMB"),
|
||||||
|
2048L, metrics.getFairShareMB());
|
||||||
|
assertEquals(getErrorMessage("fairShareVcores"),
|
||||||
|
4L, metrics.getFairShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("fairShareMB"),
|
||||||
|
2048L, metrics.getFairShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("fairShareVcores"),
|
||||||
|
4L, metrics.getFairShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("fairShare for resource: " + RESOURCE_NAME),
|
||||||
|
20L, metrics.getFairShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setFairShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("fairShareMB"),
|
||||||
|
2049L, metrics.getFairShareMB());
|
||||||
|
assertEquals(getErrorMessage("fairShareVcores"),
|
||||||
|
5L, metrics.getFairShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("fairShareMB"),
|
||||||
|
2049L, metrics.getFairShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("fairShareVcores"),
|
||||||
|
5L, metrics.getFairShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("fairShare for resource: " + RESOURCE_NAME),
|
||||||
|
0, metrics.getFairShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetSteadyFairShare() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setSteadyFairShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareMB"),
|
||||||
|
2048L, metrics.getSteadyFairShareMB());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareVcores"),
|
||||||
|
4L, metrics.getSteadyFairShareVCores());
|
||||||
|
|
||||||
|
Resource steadyFairShare = metrics.getSteadyFairShare();
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareMB"),
|
||||||
|
2048L, steadyFairShare.getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareVcores"),
|
||||||
|
4L, steadyFairShare.getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShare for resource: " +
|
||||||
|
RESOURCE_NAME),
|
||||||
|
20L, steadyFairShare.getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setSteadyFairShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareMB"),
|
||||||
|
2049L, metrics.getSteadyFairShareMB());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareVcores"),
|
||||||
|
5L, metrics.getSteadyFairShareVCores());
|
||||||
|
|
||||||
|
steadyFairShare = metrics.getSteadyFairShare();
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareMB"),
|
||||||
|
2049L, steadyFairShare.getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShareVcores"),
|
||||||
|
5L, steadyFairShare.getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("steadyFairShare for resource: " +
|
||||||
|
RESOURCE_NAME),
|
||||||
|
0, steadyFairShare.getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetMinShare() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setMinShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("minShareMB"),
|
||||||
|
2048L, metrics.getMinShareMB());
|
||||||
|
assertEquals(getErrorMessage("minShareVcores"),
|
||||||
|
4L, metrics.getMinShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("minShareMB"),
|
||||||
|
2048L, metrics.getMinShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("minShareVcores"),
|
||||||
|
4L, metrics.getMinShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("minShare for resource: " + RESOURCE_NAME),
|
||||||
|
20L, metrics.getMinShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setMinShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("minShareMB"),
|
||||||
|
2049L, metrics.getMinShareMB());
|
||||||
|
assertEquals(getErrorMessage("minShareVcores"),
|
||||||
|
5L, metrics.getMinShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("minShareMB"),
|
||||||
|
2049L, metrics.getMinShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("minShareVcores"),
|
||||||
|
5L, metrics.getMinShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("minShare for resource: " + RESOURCE_NAME),
|
||||||
|
0, metrics.getMinShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetMaxShare() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setMaxShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("maxShareMB"),
|
||||||
|
2048L, metrics.getMaxShareMB());
|
||||||
|
assertEquals(getErrorMessage("maxShareVcores"),
|
||||||
|
4L, metrics.getMaxShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("maxShareMB"),
|
||||||
|
2048L, metrics.getMaxShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("maxShareVcores"),
|
||||||
|
4L, metrics.getMaxShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("maxShare for resource: " + RESOURCE_NAME),
|
||||||
|
20L, metrics.getMaxShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setMaxShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("maxShareMB"),
|
||||||
|
2049L, metrics.getMaxShareMB());
|
||||||
|
assertEquals(getErrorMessage("maxShareVcores"),
|
||||||
|
5L, metrics.getMaxShareVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("maxShareMB"),
|
||||||
|
2049L, metrics.getMaxShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("maxShareVcores"),
|
||||||
|
5L, metrics.getMaxShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("maxShare for resource: " + RESOURCE_NAME),
|
||||||
|
0, metrics.getMaxShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetMaxAMShare() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setMaxAMShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("maxAMShareMB"),
|
||||||
|
2048L, metrics.getMaxAMShareMB());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareVcores"),
|
||||||
|
4L, metrics.getMaxAMShareVCores());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareMB"),
|
||||||
|
2048L, metrics.getMaxAMShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareVcores"),
|
||||||
|
4L, metrics.getMaxAMShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage(
|
||||||
|
"maxAMShare for resource: " + RESOURCE_NAME),
|
||||||
|
20L, metrics.getMaxAMShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setMaxAMShare(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("maxAMShareMB"),
|
||||||
|
2049L, metrics.getMaxAMShareMB());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareVcores"),
|
||||||
|
5L, metrics.getMaxAMShareVCores());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareMB"),
|
||||||
|
2049L, metrics.getMaxAMShare().getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("maxAMShareVcores"),
|
||||||
|
5L, metrics.getMaxAMShare().getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage(
|
||||||
|
"maxAMShare for resource: " + RESOURCE_NAME),
|
||||||
|
0, metrics.getMaxAMShare().getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetAMResourceUsage() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
|
||||||
|
Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
|
||||||
|
20L));
|
||||||
|
metrics.setAMResourceUsage(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageMB"),
|
||||||
|
2048L, metrics.getAMResourceUsageMB());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageVcores"),
|
||||||
|
4L, metrics.getAMResourceUsageVCores());
|
||||||
|
|
||||||
|
Resource amResourceUsage = metrics.getAMResourceUsage();
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageMB"),
|
||||||
|
2048L, amResourceUsage.getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageVcores"),
|
||||||
|
4L, amResourceUsage.getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsage for resource: " +
|
||||||
|
RESOURCE_NAME),
|
||||||
|
20L, amResourceUsage.getResourceValue(RESOURCE_NAME));
|
||||||
|
|
||||||
|
res = Resource.newInstance(2049L, 5);
|
||||||
|
metrics.setAMResourceUsage(res);
|
||||||
|
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageMB"),
|
||||||
|
2049L, metrics.getAMResourceUsageMB());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageVcores"),
|
||||||
|
5L, metrics.getAMResourceUsageVCores());
|
||||||
|
|
||||||
|
amResourceUsage = metrics.getAMResourceUsage();
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageMB"),
|
||||||
|
2049L, amResourceUsage.getMemorySize());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsageVcores"),
|
||||||
|
5L, amResourceUsage.getVirtualCores());
|
||||||
|
assertEquals(getErrorMessage("AMResourceUsage for resource: " +
|
||||||
|
RESOURCE_NAME),
|
||||||
|
0, amResourceUsage.getResourceValue(RESOURCE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetMaxApps() {
|
||||||
|
FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
|
||||||
|
metrics.setMaxApps(25);
|
||||||
|
assertEquals(getErrorMessage("maxApps"), 25L, metrics.getMaxApps());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue