YARN-2984. Metrics for container's actual memory usage. (kasha)
(cherry picked from commit 84198564ba
)
This commit is contained in:
parent
dcd6eed13a
commit
4d8fa9615f
|
@ -69,7 +69,8 @@ public class MetricsCollectorImpl implements MetricsCollector,
|
||||||
return rbs.iterator();
|
return rbs.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
void clear() { rbs.clear(); }
|
@InterfaceAudience.Private
|
||||||
|
public void clear() { rbs.clear(); }
|
||||||
|
|
||||||
MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
|
MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
|
||||||
recordFilter = rf;
|
recordFilter = rf;
|
||||||
|
|
|
@ -161,6 +161,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2807. Option "--forceactive" not works as described in usage of
|
YARN-2807. Option "--forceactive" not works as described in usage of
|
||||||
"yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong)
|
"yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong)
|
||||||
|
|
||||||
|
YARN-2984. Metrics for container's actual memory usage. (kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -823,6 +823,20 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
|
public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
|
||||||
false;
|
false;
|
||||||
|
|
||||||
|
/** Enable/disable container metrics. */
|
||||||
|
@Private
|
||||||
|
public static final String NM_CONTAINER_METRICS_ENABLE =
|
||||||
|
NM_PREFIX + "container-metrics.enable";
|
||||||
|
@Private
|
||||||
|
public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true;
|
||||||
|
|
||||||
|
/** Container metrics flush period. -1 for flush on completion. */
|
||||||
|
@Private
|
||||||
|
public static final String NM_CONTAINER_METRICS_PERIOD_MS =
|
||||||
|
NM_PREFIX + "container-metrics.period-ms";
|
||||||
|
@Private
|
||||||
|
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
|
||||||
|
|
||||||
/** Prefix for all node manager disk health checker configs. */
|
/** Prefix for all node manager disk health checker configs. */
|
||||||
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
|
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
|
||||||
"yarn.nodemanager.disk-health-checker.";
|
"yarn.nodemanager.disk-health-checker.";
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(context="container")
|
||||||
|
public class ContainerMetrics implements MetricsSource {
|
||||||
|
|
||||||
|
@Metric
|
||||||
|
public MutableStat pMemMBsStat;
|
||||||
|
|
||||||
|
static final MetricsInfo RECORD_INFO =
|
||||||
|
info("ContainerUsage", "Resource usage by container");
|
||||||
|
|
||||||
|
final MetricsInfo recordInfo;
|
||||||
|
final MetricsRegistry registry;
|
||||||
|
final ContainerId containerId;
|
||||||
|
final MetricsSystem metricsSystem;
|
||||||
|
|
||||||
|
// Metrics publishing status
|
||||||
|
private long flushPeriodMs;
|
||||||
|
private boolean flushOnPeriod = false; // true if period elapsed
|
||||||
|
private boolean finished = false; // true if container finished
|
||||||
|
private boolean unregister = false; // unregister
|
||||||
|
private Timer timer; // lazily initialized
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple metrics cache to help prevent re-registrations.
|
||||||
|
*/
|
||||||
|
protected final static Map<ContainerId, ContainerMetrics>
|
||||||
|
usageMetrics = new HashMap<>();
|
||||||
|
|
||||||
|
ContainerMetrics(
|
||||||
|
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
|
||||||
|
this.recordInfo =
|
||||||
|
info(sourceName(containerId), RECORD_INFO.description());
|
||||||
|
this.registry = new MetricsRegistry(recordInfo);
|
||||||
|
this.metricsSystem = ms;
|
||||||
|
this.containerId = containerId;
|
||||||
|
this.flushPeriodMs = flushPeriodMs;
|
||||||
|
scheduleTimerTaskIfRequired();
|
||||||
|
|
||||||
|
this.pMemMBsStat = registry.newStat(
|
||||||
|
"pMem", "Physical memory stats", "Usage", "MBs", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
|
||||||
|
registry.tag(info, containerId.toString());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
static String sourceName(ContainerId containerId) {
|
||||||
|
return RECORD_INFO.name() + "_" + containerId.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ContainerMetrics forContainer(ContainerId containerId) {
|
||||||
|
return forContainer(containerId, -1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ContainerMetrics forContainer(
|
||||||
|
ContainerId containerId, long flushPeriodMs) {
|
||||||
|
return forContainer(
|
||||||
|
DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized static ContainerMetrics forContainer(
|
||||||
|
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
|
||||||
|
ContainerMetrics metrics = usageMetrics.get(containerId);
|
||||||
|
if (metrics == null) {
|
||||||
|
metrics = new ContainerMetrics(
|
||||||
|
ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
|
||||||
|
|
||||||
|
// Register with the MetricsSystems
|
||||||
|
if (ms != null) {
|
||||||
|
metrics =
|
||||||
|
ms.register(sourceName(containerId),
|
||||||
|
"Metrics for container: " + containerId, metrics);
|
||||||
|
}
|
||||||
|
usageMetrics.put(containerId, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
|
//Container goes through registered -> finished -> unregistered.
|
||||||
|
if (unregister) {
|
||||||
|
metricsSystem.unregisterSource(recordInfo.name());
|
||||||
|
usageMetrics.remove(containerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (finished || flushOnPeriod) {
|
||||||
|
registry.snapshot(collector.addRecord(registry.info()), all);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (finished) {
|
||||||
|
this.unregister = true;
|
||||||
|
} else if (flushOnPeriod) {
|
||||||
|
flushOnPeriod = false;
|
||||||
|
scheduleTimerTaskIfRequired();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void finished() {
|
||||||
|
this.finished = true;
|
||||||
|
if (timer != null) {
|
||||||
|
timer.cancel();
|
||||||
|
timer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recordMemoryUsage(int memoryMBs) {
|
||||||
|
this.pMemMBsStat.add(memoryMBs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void scheduleTimerTaskIfRequired() {
|
||||||
|
if (flushPeriodMs > 0) {
|
||||||
|
// Lazily initialize timer
|
||||||
|
if (timer == null) {
|
||||||
|
this.timer = new Timer("Metrics flush checker", true);
|
||||||
|
}
|
||||||
|
TimerTask timerTask = new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
synchronized (ContainerMetrics.this) {
|
||||||
|
if (!finished) {
|
||||||
|
flushOnPeriod = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
timer.schedule(timerTask, flushPeriodMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -51,6 +51,8 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
|
|
||||||
private long monitoringInterval;
|
private long monitoringInterval;
|
||||||
private MonitoringThread monitoringThread;
|
private MonitoringThread monitoringThread;
|
||||||
|
private boolean containerMetricsEnabled;
|
||||||
|
private long containerMetricsPeriodMs;
|
||||||
|
|
||||||
final List<ContainerId> containersToBeRemoved;
|
final List<ContainerId> containersToBeRemoved;
|
||||||
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
|
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
|
||||||
|
@ -106,6 +108,13 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
LOG.info(" Using ResourceCalculatorProcessTree : "
|
LOG.info(" Using ResourceCalculatorProcessTree : "
|
||||||
+ this.processTreeClass);
|
+ this.processTreeClass);
|
||||||
|
|
||||||
|
this.containerMetricsEnabled =
|
||||||
|
conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
|
||||||
|
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
|
||||||
|
this.containerMetricsPeriodMs =
|
||||||
|
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
|
||||||
|
|
||||||
long configuredPMemForContainers = conf.getLong(
|
long configuredPMemForContainers = conf.getLong(
|
||||||
YarnConfiguration.NM_PMEM_MB,
|
YarnConfiguration.NM_PMEM_MB,
|
||||||
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
|
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
|
||||||
|
@ -352,6 +361,9 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
// Remove finished containers
|
// Remove finished containers
|
||||||
synchronized (containersToBeRemoved) {
|
synchronized (containersToBeRemoved) {
|
||||||
for (ContainerId containerId : containersToBeRemoved) {
|
for (ContainerId containerId : containersToBeRemoved) {
|
||||||
|
if (containerMetricsEnabled) {
|
||||||
|
ContainerMetrics.forContainer(containerId).finished();
|
||||||
|
}
|
||||||
trackingContainers.remove(containerId);
|
trackingContainers.remove(containerId);
|
||||||
LOG.info("Stopping resource-monitoring for " + containerId);
|
LOG.info("Stopping resource-monitoring for " + containerId);
|
||||||
}
|
}
|
||||||
|
@ -408,7 +420,15 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
LOG.info(String.format(
|
LOG.info(String.format(
|
||||||
"Memory usage of ProcessTree %s for container-id %s: ",
|
"Memory usage of ProcessTree %s for container-id %s: ",
|
||||||
pId, containerId.toString()) +
|
pId, containerId.toString()) +
|
||||||
formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
|
formatUsageString(
|
||||||
|
currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
|
||||||
|
|
||||||
|
// Add usage to container metrics
|
||||||
|
if (containerMetricsEnabled) {
|
||||||
|
ContainerMetrics.forContainer(
|
||||||
|
containerId, containerMetricsPeriodMs).recordMemoryUsage(
|
||||||
|
(int) (currentPmemUsage >> 20));
|
||||||
|
}
|
||||||
|
|
||||||
boolean isMemoryOverLimit = false;
|
boolean isMemoryOverLimit = false;
|
||||||
String msg = "";
|
String msg = "";
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.util.Timer;
|
||||||
|
|
||||||
|
public class TestContainerMetrics {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerMetricsFlow() throws InterruptedException {
|
||||||
|
final String ERR = "Error in number of records";
|
||||||
|
|
||||||
|
// Create a dummy MetricsSystem
|
||||||
|
MetricsSystem system = mock(MetricsSystem.class);
|
||||||
|
doReturn(this).when(system).register(anyString(), anyString(), any());
|
||||||
|
|
||||||
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
|
ContainerId containerId = mock(ContainerId.class);
|
||||||
|
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
|
||||||
|
|
||||||
|
metrics.recordMemoryUsage(1024);
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 0, collector.getRecords().size());
|
||||||
|
|
||||||
|
Thread.sleep(110);
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
|
collector.clear();
|
||||||
|
|
||||||
|
Thread.sleep(110);
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
|
collector.clear();
|
||||||
|
|
||||||
|
metrics.finished();
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
|
collector.clear();
|
||||||
|
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 0, collector.getRecords().size());
|
||||||
|
|
||||||
|
Thread.sleep(110);
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 0, collector.getRecords().size());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue