YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation

This commit is contained in:
9uapaw 2022-05-16 10:40:46 +02:00 committed by Benjamin Teke
parent 54cd0174c0
commit 0e6a6d1880
5 changed files with 121 additions and 1 deletions

View File

@ -40,6 +40,7 @@ public class PartitionQueueMetrics extends QueueMetrics {
String parentMetricName = String parentMetricName =
partition + METRIC_NAME_DELIMITER + newQueueName; partition + METRIC_NAME_DELIMITER + newQueueName;
setParent(getQueueMetrics().get(parentMetricName)); setParent(getQueueMetrics().get(parentMetricName));
storedPartitionMetrics = null;
} }
} }

View File

@ -22,7 +22,9 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -40,12 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue; import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -133,7 +137,7 @@ public class QueueMetrics implements MetricsSource {
protected final MetricsRegistry registry; protected final MetricsRegistry registry;
protected final String queueName; protected final String queueName;
private QueueMetrics parent; private QueueMetrics parent;
private final Queue parentQueue; private Queue parentQueue;
protected final MetricsSystem metricsSystem; protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users; protected final Map<String, QueueMetrics> users;
protected final Configuration conf; protected final Configuration conf;
@ -177,6 +181,7 @@ public class QueueMetrics implements MetricsSource {
"AggregatePreemptedSeconds."; "AggregatePreemptedSeconds.";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME"; "Aggregate Preempted Seconds for NAME";
protected Set<String> storedPartitionMetrics = Sets.newConcurrentHashSet();
public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) { boolean enableUserMetrics, Configuration conf) {
@ -338,6 +343,7 @@ public class QueueMetrics implements MetricsSource {
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
this.queueName)); this.queueName));
getQueueMetrics().put(metricName, queueMetrics); getQueueMetrics().put(metricName, queueMetrics);
registerPartitionMetricsCreation(metricName);
return queueMetrics; return queueMetrics;
} else { } else {
return metrics; return metrics;
@ -380,6 +386,7 @@ public class QueueMetrics implements MetricsSource {
partitionJMXStr)); partitionJMXStr));
} }
getQueueMetrics().put(metricName, metrics); getQueueMetrics().put(metricName, metrics);
registerPartitionMetricsCreation(metricName);
} }
return metrics; return metrics;
} }
@ -1332,4 +1339,26 @@ public class QueueMetrics implements MetricsSource {
public Queue getParentQueue() { public Queue getParentQueue() {
return parentQueue; return parentQueue;
} }
protected void registerPartitionMetricsCreation(String metricName) {
if (storedPartitionMetrics != null) {
storedPartitionMetrics.add(metricName);
}
}
public void setParentQueue(Queue parentQueue) {
this.parentQueue = parentQueue;
if (storedPartitionMetrics == null) {
return;
}
for (String partitionMetric : storedPartitionMetrics) {
QueueMetrics metric = getQueueMetrics().get(partitionMetric);
if (metric != null && metric.parentQueue != null) {
metric.parentQueue = parentQueue;
}
}
}
} }

View File

@ -262,6 +262,7 @@ public abstract class AbstractCSQueue implements CSQueue {
@Override @Override
public void setParent(CSQueue newParentQueue) { public void setParent(CSQueue newParentQueue) {
this.parent = newParentQueue; this.parent = newParentQueue;
getMetrics().setParentQueue(newParentQueue);
} }
@Override @Override

View File

@ -90,6 +90,35 @@ public final class CapacitySchedulerQueueHelpers {
return conf; return conf;
} }
public static CapacitySchedulerConfiguration setupAdditionalQueues(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2", "a3"});
conf.setCapacity(A1, 30.0f);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, 30.0f);
conf.setUserLimitFactor(A2, 100.0f);
conf.setCapacity("root.a.a3", 40.0f);
conf.setUserLimitFactor("root.a.a3", 100.0f);
conf.setQueues(B, new String[]{"b1", "b2", "b3"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, B2_CAPACITY);
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
return conf;
}
/** /**
* @param conf, to be modified * @param conf, to be modified
* @return CS configuration which has deleted all children of queue(b) * @return CS configuration which has deleted all children of queue(b)

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
@ -81,6 +82,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -2663,6 +2665,64 @@ public class TestCapacityScheduler {
ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
} }
/**
* Tests
* @throws Exception
*/
@Test
public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory =
ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores =
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setResourceComparator(DominantResourceCalculator.class);
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
// Don't reset resource types since we have already configured resource
// types
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
csConf = new CapacitySchedulerConfiguration();
setupAdditionalQueues(csConf);
cs.reinitialize(csConf, cs.getRMContext());
QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get(
"default.root.a.a3");
Assert.assertSame("Different ParentQueue of siblings is a sign of a memory leak",
QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue());
Assert.assertSame("Different ParentQueue of partition metrics is a sign of a memory leak",
QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
a3DefaultPartitionMetrics.getParentQueue());
}
@Test @Test
public void testCSQueueMetrics() throws Exception { public void testCSQueueMetrics() throws Exception {