YARN-10519. Refactor QueueMetricsForCustomResources class to move to yarn-common package. Contributed by Minni Mittal

This commit is contained in:
bibinchundatt 2021-01-20 11:00:04 +05:30
parent 45b3a84709
commit 8bc2dfbf36
10 changed files with 270 additions and 177 deletions

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.metrics;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -29,26 +29,26 @@
* the name of the custom resource. * the name of the custom resource.
* There are different kinds of values like allocated, available and others. * There are different kinds of values like allocated, available and others.
*/ */
public class QueueMetricsCustomResource { public class CustomResourceMetricValue {
private final Map<String, Long> values = Maps.newHashMap(); private final Map<String, Long> values = Maps.newHashMap();
protected void increase(Resource res) { public void increase(Resource res) {
update(res, Long::sum); update(res, Long::sum);
} }
void increaseWithMultiplier(Resource res, long multiplier) { public void increaseWithMultiplier(Resource res, long multiplier) {
update(res, (v1, v2) -> v1 + v2 * multiplier); update(res, (v1, v2) -> v1 + v2 * multiplier);
} }
protected void decrease(Resource res) { public void decrease(Resource res) {
update(res, (v1, v2) -> v1 - v2); update(res, (v1, v2) -> v1 - v2);
} }
void decreaseWithMultiplier(Resource res, int containers) { public void decreaseWithMultiplier(Resource res, int containers) {
update(res, (v1, v2) -> v1 - v2 * containers); update(res, (v1, v2) -> v1 - v2 * containers);
} }
protected void set(Resource res) { public void set(Resource res) {
update(res, (v1, v2) -> v2); update(res, (v1, v2) -> v2);
} }
@ -64,8 +64,7 @@ private void update(Resource res, BiFunction<Long, Long, Long> operation) {
if (!values.containsKey(resource.getName())) { if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L); values.put(resource.getName(), 0L);
} }
values.merge(resource.getName(), values.merge(resource.getName(), resource.getValue(), operation);
resource.getValue(), operation);
} }
} }
} }

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.metrics;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.HashMap;
import java.util.Map;
/**
* This is base class for allocated and available metrics for
* custom resources.
*/
public class CustomResourceMetrics {
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
"AllocatedResource.";
private static final String ALLOCATED_RESOURCE_METRIC_DESC = "Allocated NAME";
private static final String AVAILABLE_RESOURCE_METRIC_PREFIX =
"AvailableResource.";
private static final String AVAILABLE_RESOURCE_METRIC_DESC = "Available NAME";
private final CustomResourceMetricValue allocated =
new CustomResourceMetricValue();
private final CustomResourceMetricValue available =
new CustomResourceMetricValue();
/**
* Register all custom resources metrics as part of initialization.
* @param customResources Map containing all custom resource types
* @param registry of the metric type
*/
public void registerCustomResources(Map<String, Long> customResources,
MetricsRegistry registry) {
registerCustomResources(customResources, registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
registerCustomResources(customResources, registry,
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
}
/**
* Get a map of all custom resource metric.
* @return map of custom resource
*/
public Map<String, Long> initAndGetCustomResources() {
Map<String, Long> customResources = new HashMap<String, Long>();
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
customResources.put(resource.getName(), Long.valueOf(0));
}
return customResources;
}
/**
* As and when this metric object construction happens for any queue, all
* custom resource metrics value would be initialized with '0' like any other
* mandatory resources metrics.
* @param customResources Map containing all custom resource types
* @param registry of the metric type
* @param metricPrefix prefix in metric name
* @param metricDesc suffix for metric name
*/
public void registerCustomResources(Map<String, Long> customResources,
MetricsRegistry registry, String metricPrefix, String metricDesc) {
for (Map.Entry<String, Long> entry : customResources.entrySet()) {
String resourceName = entry.getKey();
Long resourceValue = entry.getValue();
MutableGaugeLong resourceMetric =
(MutableGaugeLong) registry.get(metricPrefix + resourceName);
if (resourceMetric == null) {
resourceMetric = registry.newGauge(metricPrefix + resourceName,
metricDesc.replace("NAME", resourceName), 0L);
}
resourceMetric.set(resourceValue);
}
}
public void setAvailable(Resource res) {
available.set(res);
}
public void increaseAllocated(Resource res) {
allocated.increase(res);
}
public void increaseAllocated(Resource res, int containers) {
allocated.increaseWithMultiplier(res, containers);
}
public void decreaseAllocated(Resource res) {
allocated.decrease(res);
}
public void decreaseAllocated(Resource res, int containers) {
allocated.decreaseWithMultiplier(res, containers);
}
public Map<String, Long> getAllocatedValues() {
return allocated.getValues();
}
public Map<String, Long> getAvailableValues() {
return available.getValues();
}
public CustomResourceMetricValue getAvailable() {
return available;
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides common metrics (available, allocated) for custom resources.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.metrics;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import java.util.Map; import java.util.Map;
@ -27,10 +28,10 @@
*/ */
public class CSQueueMetricsForCustomResources public class CSQueueMetricsForCustomResources
extends QueueMetricsForCustomResources { extends QueueMetricsForCustomResources {
private final QueueMetricsCustomResource guaranteedCapacity = private final CustomResourceMetricValue guaranteedCapacity =
new QueueMetricsCustomResource(); new CustomResourceMetricValue();
private final QueueMetricsCustomResource maxCapacity = private final CustomResourceMetricValue maxCapacity =
new QueueMetricsCustomResource(); new CustomResourceMetricValue();
public void setGuaranteedCapacity(Resource res) { public void setGuaranteedCapacity(Resource res) {
guaranteedCapacity.set(res); guaranteedCapacity.set(res);

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import java.util.Map; import java.util.Map;
@ -26,20 +27,20 @@
* It provides increase and decrease methods for all types of metrics. * It provides increase and decrease methods for all types of metrics.
*/ */
public class FSQueueMetricsForCustomResources { public class FSQueueMetricsForCustomResources {
private final QueueMetricsCustomResource fairShare = private final CustomResourceMetricValue
new QueueMetricsCustomResource(); fairShare = new CustomResourceMetricValue();
private final QueueMetricsCustomResource steadyFairShare = private final CustomResourceMetricValue steadyFairShare =
new QueueMetricsCustomResource(); new CustomResourceMetricValue();
private final QueueMetricsCustomResource minShare = private final CustomResourceMetricValue
new QueueMetricsCustomResource(); minShare = new CustomResourceMetricValue();
private final QueueMetricsCustomResource maxShare = private final CustomResourceMetricValue
new QueueMetricsCustomResource(); maxShare = new CustomResourceMetricValue();
private final QueueMetricsCustomResource maxAMShare = private final CustomResourceMetricValue
new QueueMetricsCustomResource(); maxAMShare = new CustomResourceMetricValue();
private final QueueMetricsCustomResource amResourceUsage = private final CustomResourceMetricValue amResourceUsage =
new QueueMetricsCustomResource(); new CustomResourceMetricValue();
public QueueMetricsCustomResource getFairShare() { public CustomResourceMetricValue getFairShare() {
return fairShare; return fairShare;
} }
@ -51,7 +52,7 @@ public Map<String, Long> getFairShareValues() {
return fairShare.getValues(); return fairShare.getValues();
} }
public QueueMetricsCustomResource getSteadyFairShare() { public CustomResourceMetricValue getSteadyFairShare() {
return steadyFairShare; return steadyFairShare;
} }
@ -63,7 +64,7 @@ public Map<String, Long> getSteadyFairShareValues() {
return steadyFairShare.getValues(); return steadyFairShare.getValues();
} }
public QueueMetricsCustomResource getMinShare() { public CustomResourceMetricValue getMinShare() {
return minShare; return minShare;
} }
@ -75,7 +76,7 @@ public Map<String, Long> getMinShareValues() {
return minShare.getValues(); return minShare.getValues();
} }
public QueueMetricsCustomResource getMaxShare() { public CustomResourceMetricValue getMaxShare() {
return maxShare; return maxShare;
} }
@ -87,7 +88,7 @@ public Map<String, Long> getMaxShareValues() {
return maxShare.getValues(); return maxShare.getValues();
} }
public QueueMetricsCustomResource getMaxAMShare() { public CustomResourceMetricValue getMaxAMShare() {
return maxAMShare; return maxAMShare;
} }
@ -99,7 +100,7 @@ public Map<String, Long> getMaxAMShareValues() {
return maxAMShare.getValues(); return maxAMShare.getValues();
} }
public QueueMetricsCustomResource getAMResourceUsage() { public CustomResourceMetricValue getAMResourceUsage() {
return amResourceUsage; return amResourceUsage;
} }

View File

@ -23,7 +23,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -43,8 +42,8 @@
import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -533,8 +532,8 @@ public void setAvailableResources(Resource limit) {
availableVCores.set(limit.getVirtualCores()); availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit); queueMetricsForCustomResources.setAvailable(limit);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAvailableValues(), queueMetricsForCustomResources.getAvailableValues(), registry,
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
} }
} }
@ -616,16 +615,6 @@ public void internalIncrPendingResources(String partition, String user,
} }
} }
protected Map<String, Long> initAndGetCustomResources() {
Map<String, Long> customResources = new HashMap<String, Long>();
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
customResources.put(resource.getName(), Long.valueOf(0));
}
return customResources;
}
protected void createQueueMetricsForCustomResources() { protected void createQueueMetricsForCustomResources() {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
@ -635,43 +624,21 @@ protected void createQueueMetricsForCustomResources() {
} }
} }
/**
* Register all custom resources metrics as part of initialization. As and
* when this metric object construction happens for any queue, all custom
* resource metrics value would be initialized with '0' like any other
* mandatory resources metrics
*/
protected void registerCustomResources() { protected void registerCustomResources() {
Map<String, Long> customResources = initAndGetCustomResources(); Map<String, Long> customResources =
registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX, queueMetricsForCustomResources.initAndGetCustomResources();
ALLOCATED_RESOURCE_METRIC_DESC); queueMetricsForCustomResources
registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX, .registerCustomResources(customResources, this.registry);
AVAILABLE_RESOURCE_METRIC_DESC); queueMetricsForCustomResources
registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX, .registerCustomResources(customResources, this.registry,
PENDING_RESOURCE_METRIC_DESC); PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX, queueMetricsForCustomResources
RESERVED_RESOURCE_METRIC_DESC); .registerCustomResources(customResources, this.registry,
registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, queueMetricsForCustomResources
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); .registerCustomResources(customResources, this.registry,
} AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
protected void registerCustomResources(Map<String, Long> customResources,
String metricPrefix, String metricDesc) {
for (Entry<String, Long> entry : customResources.entrySet()) {
String resourceName = entry.getKey();
Long resourceValue = entry.getValue();
MutableGaugeLong resourceMetric =
(MutableGaugeLong) this.registry.get(metricPrefix + resourceName);
if (resourceMetric == null) {
resourceMetric =
this.registry.newGauge(metricPrefix + resourceName,
metricDesc.replace("NAME", resourceName), 0L);
}
resourceMetric.set(resourceValue);
}
} }
private void incrementPendingResources(int containers, Resource res) { private void incrementPendingResources(int containers, Resource res) {
@ -680,7 +647,8 @@ private void incrementPendingResources(int containers, Resource res) {
pendingVCores.incr(res.getVirtualCores() * containers); pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers); queueMetricsForCustomResources.increasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(), queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
} }
} }
@ -722,7 +690,8 @@ private void decrementPendingResources(int containers, Resource res) {
pendingVCores.decr(res.getVirtualCores() * containers); pendingVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res, containers); queueMetricsForCustomResources.decreasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(), queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
} }
} }
@ -793,8 +762,8 @@ private void computeAllocateResources(int containers, Resource res,
allocatedVCores.incr(res.getVirtualCores() * containers); allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers); queueMetricsForCustomResources.increaseAllocated(res, containers);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
} }
if (decrPending) { if (decrPending) {
@ -813,8 +782,8 @@ public void allocateResources(String partition, String user, Resource res) {
allocatedVCores.incr(res.getVirtualCores()); allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res); queueMetricsForCustomResources.increaseAllocated(res);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
} }
@ -822,7 +791,8 @@ public void allocateResources(String partition, String user, Resource res) {
pendingVCores.decr(res.getVirtualCores()); pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res); queueMetricsForCustomResources.decreasePending(res);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(), queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
} }
@ -879,8 +849,8 @@ private void computeReleaseResources(int containers, Resource res) {
allocatedVCores.decr(res.getVirtualCores() * containers); allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers); queueMetricsForCustomResources.decreaseAllocated(res, containers);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
} }
} }
@ -928,9 +898,9 @@ public void updatePreemptedSecondsForCustomResources(Resource res,
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources queueMetricsForCustomResources
.increaseAggregatedPreemptedSeconds(res, seconds); .increaseAggregatedPreemptedSeconds(res, seconds);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAggregatePreemptedSeconds() queueMetricsForCustomResources.getAggregatePreemptedSeconds()
.getValues(), .getValues(), this.registry,
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
} }
@ -971,8 +941,8 @@ public void incrReserveResources(Resource res) {
reservedVCores.incr(res.getVirtualCores()); reservedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseReserved(res); queueMetricsForCustomResources.increaseReserved(res);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getReservedValues(), queueMetricsForCustomResources.getReservedValues(), this.registry,
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
} }
} }
@ -1010,8 +980,8 @@ public void decrReserveResource(Resource res) {
reservedVCores.decr(res.getVirtualCores()); reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseReserved(res); queueMetricsForCustomResources.decreaseReserved(res);
registerCustomResources( queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getReservedValues(), queueMetricsForCustomResources.getReservedValues(), this.registry,
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
} }
} }
@ -1114,7 +1084,7 @@ public Resource getReservedResources() {
* @return QueueMetricsCustomResource * @return QueueMetricsCustomResource
*/ */
@VisibleForTesting @VisibleForTesting
public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() { public CustomResourceMetricValue getAggregatedPreemptedSecondsResources() {
return queueMetricsForCustomResources.getAggregatePreemptedSeconds(); return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
} }
@ -1232,7 +1202,7 @@ public long getAggregatePreemptedContainers() {
public void fillInValuesFromAvailableResources(Resource fromResource, public void fillInValuesFromAvailableResources(Resource fromResource,
Resource targetResource) { Resource targetResource) {
if (queueMetricsForCustomResources != null) { if (queueMetricsForCustomResources != null) {
QueueMetricsCustomResource availableResources = CustomResourceMetricValue availableResources =
queueMetricsForCustomResources.getAvailable(); queueMetricsForCustomResources.getAvailable();
// We expect all custom resources contained in availableResources, // We expect all custom resources contained in availableResources,
@ -1257,7 +1227,7 @@ public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
return this.queueMetricsForCustomResources; return this.queueMetricsForCustomResources;
} }
public void setQueueMetricsForCustomResources( protected void setQueueMetricsForCustomResources(
QueueMetricsForCustomResources metrics) { QueueMetricsForCustomResources metrics) {
this.queueMetricsForCustomResources = metrics; this.queueMetricsForCustomResources = metrics;
} }

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -6,7 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,28 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.metrics.CustomResourceMetrics;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import java.util.Map; import java.util.Map;
/** public class QueueMetricsForCustomResources extends CustomResourceMetrics {
* This class is a main entry-point for any kind of metrics for private final CustomResourceMetricValue aggregatePreemptedSeconds =
* custom resources. new CustomResourceMetricValue();
* It provides increase and decrease methods for all types of metrics. private final CustomResourceMetricValue aggregatePreempted =
*/ new CustomResourceMetricValue();
public class QueueMetricsForCustomResources { private final CustomResourceMetricValue pending =
private final QueueMetricsCustomResource aggregatePreemptedSeconds = new CustomResourceMetricValue();
new QueueMetricsCustomResource(); private final CustomResourceMetricValue reserved =
private final QueueMetricsCustomResource aggregatePreempted = new CustomResourceMetricValue();
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource allocated =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource available =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource pending =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource reserved =
new QueueMetricsCustomResource();
public void increaseReserved(Resource res) { public void increaseReserved(Resource res) {
reserved.increase(res); reserved.increase(res);
@ -48,10 +42,6 @@ public void decreaseReserved(Resource res) {
reserved.decrease(res); reserved.decrease(res);
} }
public void setAvailable(Resource res) {
available.set(res);
}
public void increasePending(Resource res, int containers) { public void increasePending(Resource res, int containers) {
pending.increaseWithMultiplier(res, containers); pending.increaseWithMultiplier(res, containers);
} }
@ -64,20 +54,12 @@ public void decreasePending(Resource res, int containers) {
pending.decreaseWithMultiplier(res, containers); pending.decreaseWithMultiplier(res, containers);
} }
public void increaseAllocated(Resource res) { public Map<String, Long> getPendingValues() {
allocated.increase(res); return pending.getValues();
} }
public void increaseAllocated(Resource res, int containers) { public Map<String, Long> getReservedValues() {
allocated.increaseWithMultiplier(res, containers); return reserved.getValues();
}
public void decreaseAllocated(Resource res) {
allocated.decrease(res);
}
public void decreaseAllocated(Resource res, int containers) {
allocated.decreaseWithMultiplier(res, containers);
} }
public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) { public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
@ -88,27 +70,7 @@ public void increaseAggregatedPreempted(Resource res) {
aggregatePreempted.increase(res); aggregatePreempted.increase(res);
} }
Map<String, Long> getAllocatedValues() { CustomResourceMetricValue getAggregatePreemptedSeconds() {
return allocated.getValues();
}
Map<String, Long> getAvailableValues() {
return available.getValues();
}
Map<String, Long> getPendingValues() {
return pending.getValues();
}
Map<String, Long> getReservedValues() {
return reserved.getValues();
}
QueueMetricsCustomResource getAggregatePreemptedSeconds() {
return aggregatePreemptedSeconds; return aggregatePreemptedSeconds;
} }
public QueueMetricsCustomResource getAvailable() {
return available;
}
} }

View File

@ -78,6 +78,8 @@ public class CSQueueMetrics extends QueueMetrics {
private static final String MAX_CAPACITY_METRIC_DESC = private static final String MAX_CAPACITY_METRIC_DESC =
"MaxCapacity of NAME"; "MaxCapacity of NAME";
private CSQueueMetricsForCustomResources csQueueMetricsForCustomResources;
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) { boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf); super(ms, queueName, parent, enableUserMetrics, conf);
@ -90,11 +92,14 @@ public class CSQueueMetrics extends QueueMetrics {
* mandatory resources metrics * mandatory resources metrics
*/ */
protected void registerCustomResources() { protected void registerCustomResources() {
Map<String, Long> customResources = initAndGetCustomResources(); Map<String, Long> customResources =
registerCustomResources(customResources, GUARANTEED_CAPACITY_METRIC_PREFIX, csQueueMetricsForCustomResources.initAndGetCustomResources();
GUARANTEED_CAPACITY_METRIC_DESC); csQueueMetricsForCustomResources
registerCustomResources(customResources, MAX_CAPACITY_METRIC_PREFIX, .registerCustomResources(customResources, this.registry,
MAX_CAPACITY_METRIC_DESC); GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
csQueueMetricsForCustomResources
.registerCustomResources(customResources, this.registry,
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
super.registerCustomResources(); super.registerCustomResources();
} }
@ -184,12 +189,10 @@ public void setGuaranteedResources(String partition, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
guaranteedMB.set(res.getMemorySize()); guaranteedMB.set(res.getMemorySize());
guaranteedVCores.set(res.getVirtualCores()); guaranteedVCores.set(res.getVirtualCores());
if (getQueueMetricsForCustomResources() != null) { if (csQueueMetricsForCustomResources != null) {
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources()) csQueueMetricsForCustomResources.setGuaranteedCapacity(res);
.setGuaranteedCapacity(res); csQueueMetricsForCustomResources.registerCustomResources(
registerCustomResources( csQueueMetricsForCustomResources.getGuaranteedCapacity(), registry,
((CSQueueMetricsForCustomResources)
getQueueMetricsForCustomResources()).getGuaranteedCapacity(),
GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC); GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
} }
} }
@ -207,12 +210,10 @@ public void setMaxCapacityResources(String partition, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
maxCapacityMB.set(res.getMemorySize()); maxCapacityMB.set(res.getMemorySize());
maxCapacityVCores.set(res.getVirtualCores()); maxCapacityVCores.set(res.getVirtualCores());
if (getQueueMetricsForCustomResources() != null) { if (csQueueMetricsForCustomResources != null) {
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources()) csQueueMetricsForCustomResources.setMaxCapacity(res);
.setMaxCapacity(res); csQueueMetricsForCustomResources.registerCustomResources(
registerCustomResources( csQueueMetricsForCustomResources.getMaxCapacity(), registry,
((CSQueueMetricsForCustomResources)
getQueueMetricsForCustomResources()).getMaxCapacity(),
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC); MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
} }
} }
@ -221,7 +222,9 @@ public void setMaxCapacityResources(String partition, Resource res) {
@Override @Override
protected void createQueueMetricsForCustomResources() { protected void createQueueMetricsForCustomResources() {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
setQueueMetricsForCustomResources(new CSQueueMetricsForCustomResources()); this.csQueueMetricsForCustomResources =
new CSQueueMetricsForCustomResources();
setQueueMetricsForCustomResources(csQueueMetricsForCustomResources);
registerCustomResources(); registerCustomResources();
} }
} }

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -293,7 +294,7 @@ private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
} }
private Resource convertPreemptedSecondsToResource(QueueMetrics qm) { private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
QueueMetricsCustomResource customValues = qm CustomResourceMetricValue customValues = qm
.getAggregatedPreemptedSecondsResources(); .getAggregatedPreemptedSecondsResources();
MutableCounterLong vcoreSeconds = qm MutableCounterLong vcoreSeconds = qm
.getAggregateVcoreSecondsPreempted(); .getAggregateVcoreSecondsPreempted();

View File

@ -38,11 +38,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -344,7 +344,7 @@ private void verifyAMShare(FSLeafQueue schedulable,
private Map<String, Long> verifyQueueMetricsForCustomResources( private Map<String, Long> verifyQueueMetricsForCustomResources(
FSLeafQueue schedulable) { FSLeafQueue schedulable) {
QueueMetricsCustomResource maxAMShareCustomResources = CustomResourceMetricValue maxAMShareCustomResources =
schedulable.getMetrics().getCustomResources().getMaxAMShare(); schedulable.getMetrics().getCustomResources().getMaxAMShare();
Map<String, Long> customResourceValues = maxAMShareCustomResources Map<String, Long> customResourceValues = maxAMShareCustomResources
.getValues(); .getValues();