YARN-3363. add localization and container launch time to ContainerMetrics at NM to show these timing information for each active container. (zxu via rkanter)
(cherry picked from commit ac7d152901e29b1f444507fe4e421eb6e1402b5a)
This commit is contained in:
parent
72751b957c
commit
32b3b8e2ef
@ -129,6 +129,10 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3406. Display count of running containers in the RM's Web UI.
|
||||
(Ryu Kobayashi via ozawa)
|
||||
|
||||
YARN-3363. add localization and container launch time to ContainerMetrics
|
||||
at NM to show these timing information for each active container.
|
||||
(zxu via rkanter)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
@ -96,6 +96,7 @@ public class ContainerImpl implements Container {
|
||||
private int exitCode = ContainerExitStatus.INVALID;
|
||||
private final StringBuilder diagnostics;
|
||||
private boolean wasLaunched;
|
||||
private long containerLocalizationStartTime;
|
||||
private long containerLaunchStartTime;
|
||||
private static Clock clock = new SystemClock();
|
||||
|
||||
@ -493,16 +494,21 @@ private void sendLaunchEvent() {
|
||||
// resource usage.
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
private void sendContainerMonitorStartEvent() {
|
||||
long pmemBytes = getResource().getMemory() * 1024 * 1024L;
|
||||
float pmemRatio = daemonConf.getFloat(
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
||||
int cpuVcores = getResource().getVirtualCores();
|
||||
long launchDuration = clock.getTime() - containerLaunchStartTime;
|
||||
metrics.addContainerLaunchDuration(launchDuration);
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerStartMonitoringEvent(containerId,
|
||||
vmemBytes, pmemBytes, cpuVcores));
|
||||
long pmemBytes = getResource().getMemory() * 1024 * 1024L;
|
||||
float pmemRatio = daemonConf.getFloat(
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
||||
int cpuVcores = getResource().getVirtualCores();
|
||||
long localizationDuration = containerLaunchStartTime -
|
||||
containerLocalizationStartTime;
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerStartMonitoringEvent(containerId,
|
||||
vmemBytes, pmemBytes, cpuVcores, launchDuration,
|
||||
localizationDuration));
|
||||
}
|
||||
|
||||
private void addDiagnostics(String... diags) {
|
||||
@ -601,6 +607,7 @@ public ContainerState transition(ContainerImpl container,
|
||||
}
|
||||
}
|
||||
|
||||
container.containerLocalizationStartTime = clock.getTime();
|
||||
// Send requests for public, private resources
|
||||
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
|
||||
if (!cntrRsrc.isEmpty()) {
|
||||
@ -756,8 +763,6 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
container.sendContainerMonitorStartEvent();
|
||||
container.metrics.runningContainer();
|
||||
container.wasLaunched = true;
|
||||
long duration = clock.getTime() - container.containerLaunchStartTime;
|
||||
container.metrics.addContainerLaunchDuration(duration);
|
||||
|
||||
if (container.recoveredAsKilled) {
|
||||
LOG.info("Killing " + container.containerId
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
||||
@ -46,6 +47,9 @@ public class ContainerMetrics implements MetricsSource {
|
||||
public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimitMBs";
|
||||
public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit";
|
||||
public static final String PMEM_USAGE_METRIC_NAME = "pMemUsageMBs";
|
||||
public static final String LAUNCH_DURATION_METRIC_NAME = "launchDurationMs";
|
||||
public static final String LOCALIZATION_DURATION_METRIC_NAME =
|
||||
"localizationDurationMs";
|
||||
private static final String PHY_CPU_USAGE_METRIC_NAME = "pCpuUsagePercent";
|
||||
|
||||
// Use a multiplier of 1000 to avoid losing too much precision when
|
||||
@ -74,6 +78,12 @@ public class ContainerMetrics implements MetricsSource {
|
||||
@Metric
|
||||
public MutableGaugeInt cpuVcoreLimit;
|
||||
|
||||
@Metric
|
||||
public MutableGaugeLong launchDurationMs;
|
||||
|
||||
@Metric
|
||||
public MutableGaugeLong localizationDurationMs;
|
||||
|
||||
static final MetricsInfo RECORD_INFO =
|
||||
info("ContainerResource", "Resource limit and usage by container");
|
||||
|
||||
@ -122,6 +132,10 @@ public class ContainerMetrics implements MetricsSource {
|
||||
VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0);
|
||||
this.cpuVcoreLimit = registry.newGauge(
|
||||
VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0);
|
||||
this.launchDurationMs = registry.newGauge(
|
||||
LAUNCH_DURATION_METRIC_NAME, "Launch duration in MS", 0L);
|
||||
this.localizationDurationMs = registry.newGauge(
|
||||
LOCALIZATION_DURATION_METRIC_NAME, "Localization duration in MS", 0L);
|
||||
}
|
||||
|
||||
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
|
||||
@ -213,6 +227,12 @@ public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
|
||||
this.cpuVcoreLimit.set(cpuVcores);
|
||||
}
|
||||
|
||||
public void recordStateChangeDurations(long launchDuration,
|
||||
long localizationDuration) {
|
||||
this.launchDurationMs.set(launchDuration);
|
||||
this.localizationDurationMs.set(localizationDuration);
|
||||
}
|
||||
|
||||
private synchronized void scheduleTimerTaskIfRequired() {
|
||||
if (flushPeriodMs > 0) {
|
||||
// Lazily initialize timer
|
||||
|
@ -25,13 +25,18 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
|
||||
private final long vmemLimit;
|
||||
private final long pmemLimit;
|
||||
private final int cpuVcores;
|
||||
private final long launchDuration;
|
||||
private final long localizationDuration;
|
||||
|
||||
public ContainerStartMonitoringEvent(ContainerId containerId,
|
||||
long vmemLimit, long pmemLimit, int cpuVcores) {
|
||||
long vmemLimit, long pmemLimit, int cpuVcores, long launchDuration,
|
||||
long localizationDuration) {
|
||||
super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
|
||||
this.vmemLimit = vmemLimit;
|
||||
this.pmemLimit = pmemLimit;
|
||||
this.cpuVcores = cpuVcores;
|
||||
this.launchDuration = launchDuration;
|
||||
this.localizationDuration = localizationDuration;
|
||||
}
|
||||
|
||||
public long getVmemLimit() {
|
||||
@ -45,4 +50,12 @@ public long getPmemLimit() {
|
||||
public int getCpuVcores() {
|
||||
return this.cpuVcores;
|
||||
}
|
||||
|
||||
public long getLaunchDuration() {
|
||||
return this.launchDuration;
|
||||
}
|
||||
|
||||
public long getLocalizationDuration() {
|
||||
return this.localizationDuration;
|
||||
}
|
||||
}
|
||||
|
@ -617,6 +617,15 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
|
||||
case START_MONITORING_CONTAINER:
|
||||
ContainerStartMonitoringEvent startEvent =
|
||||
(ContainerStartMonitoringEvent) monitoringEvent;
|
||||
|
||||
if (containerMetricsEnabled) {
|
||||
ContainerMetrics usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs);
|
||||
usageMetrics.recordStateChangeDurations(
|
||||
startEvent.getLaunchDuration(),
|
||||
startEvent.getLocalizationDuration());
|
||||
}
|
||||
|
||||
synchronized (this.containersToBeAdded) {
|
||||
ProcessTreeInfo processTreeInfo =
|
||||
new ProcessTreeInfo(containerId, null, null,
|
||||
|
@ -87,10 +87,14 @@ public void testContainerMetricsLimit() throws InterruptedException {
|
||||
int anyPmemLimit = 1024;
|
||||
int anyVmemLimit = 2048;
|
||||
int anyVcores = 10;
|
||||
long anyLaunchDuration = 20L;
|
||||
long anyLocalizationDuration = 1000L;
|
||||
String anyProcessId = "1234";
|
||||
|
||||
metrics.recordResourceLimit(anyVmemLimit, anyPmemLimit, anyVcores);
|
||||
metrics.recordProcessId(anyProcessId);
|
||||
metrics.recordStateChangeDurations(anyLaunchDuration,
|
||||
anyLocalizationDuration);
|
||||
|
||||
Thread.sleep(110);
|
||||
metrics.getMetrics(collector, true);
|
||||
@ -105,6 +109,12 @@ public void testContainerMetricsLimit() throws InterruptedException {
|
||||
MetricsRecords.assertMetric(record, ContainerMetrics.VMEM_LIMIT_METRIC_NAME, anyVmemLimit);
|
||||
MetricsRecords.assertMetric(record, ContainerMetrics.VCORE_LIMIT_METRIC_NAME, anyVcores);
|
||||
|
||||
MetricsRecords.assertMetric(record,
|
||||
ContainerMetrics.LAUNCH_DURATION_METRIC_NAME, anyLaunchDuration);
|
||||
MetricsRecords.assertMetric(record,
|
||||
ContainerMetrics.LOCALIZATION_DURATION_METRIC_NAME,
|
||||
anyLocalizationDuration);
|
||||
|
||||
collector.clear();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user