diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java index be442edb1b8..5345c1baf88 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java @@ -69,7 +69,8 @@ public class MetricsCollectorImpl implements MetricsCollector, return rbs.iterator(); } - void clear() { rbs.clear(); } + @InterfaceAudience.Private + public void clear() { rbs.clear(); } MetricsCollectorImpl setRecordFilter(MetricsFilter rf) { recordFilter = rf; diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f9d4f54af86..7ee91d31d3a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -161,6 +161,8 @@ Release 2.7.0 - UNRELEASED YARN-2807. Option "--forceactive" not works as described in usage of "yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong) + YARN-2984. Metrics for container's actual memory usage. (kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f4e76820643..3df1292e217 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -822,6 +822,20 @@ public class YarnConfiguration extends Configuration { "container-monitor.procfs-tree.smaps-based-rss.enabled"; public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = false; + + /** Enable/disable container metrics. */ + @Private + public static final String NM_CONTAINER_METRICS_ENABLE = + NM_PREFIX + "container-metrics.enable"; + @Private + public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true; + + /** Container metrics flush period. -1 for flush on completion. */ + @Private + public static final String NM_CONTAINER_METRICS_PERIOD_MS = + NM_PREFIX + "container-metrics.period-ms"; + @Private + public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; /** Prefix for all node manager disk health checker configs. */ private static final String NM_DISK_HEALTH_CHECK_PREFIX = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java new file mode 100644 index 00000000000..12201dd3ac5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableStat; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@InterfaceAudience.Private +@Metrics(context="container") +public class ContainerMetrics implements MetricsSource { + + @Metric + public MutableStat pMemMBsStat; + + static final MetricsInfo RECORD_INFO = + info("ContainerUsage", "Resource usage by container"); + + final MetricsInfo recordInfo; + final MetricsRegistry registry; + final ContainerId containerId; + final MetricsSystem metricsSystem; + + // Metrics publishing status + private long flushPeriodMs; + private boolean flushOnPeriod = false; // true if period elapsed + private boolean finished = false; // true if container finished + private boolean unregister = false; // unregister + private Timer timer; // lazily initialized + + /** + * Simple metrics cache to help prevent re-registrations. + */ + protected final static Map + usageMetrics = new HashMap<>(); + + ContainerMetrics( + MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + this.recordInfo = + info(sourceName(containerId), RECORD_INFO.description()); + this.registry = new MetricsRegistry(recordInfo); + this.metricsSystem = ms; + this.containerId = containerId; + this.flushPeriodMs = flushPeriodMs; + scheduleTimerTaskIfRequired(); + + this.pMemMBsStat = registry.newStat( + "pMem", "Physical memory stats", "Usage", "MBs", true); + } + + ContainerMetrics tag(MetricsInfo info, ContainerId containerId) { + registry.tag(info, containerId.toString()); + return this; + } + + static String sourceName(ContainerId containerId) { + return RECORD_INFO.name() + "_" + containerId.toString(); + } + + public static ContainerMetrics forContainer(ContainerId containerId) { + return forContainer(containerId, -1L); + } + + public static ContainerMetrics forContainer( + ContainerId containerId, long flushPeriodMs) { + return forContainer( + DefaultMetricsSystem.instance(), containerId, flushPeriodMs); + } + + synchronized static ContainerMetrics forContainer( + MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + ContainerMetrics metrics = usageMetrics.get(containerId); + if (metrics == null) { + metrics = new ContainerMetrics( + ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(sourceName(containerId), + "Metrics for container: " + containerId, metrics); + } + usageMetrics.put(containerId, metrics); + } + + return metrics; + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + //Container goes through registered -> finished -> unregistered. + if (unregister) { + metricsSystem.unregisterSource(recordInfo.name()); + usageMetrics.remove(containerId); + return; + } + + if (finished || flushOnPeriod) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + if (finished) { + this.unregister = true; + } else if (flushOnPeriod) { + flushOnPeriod = false; + scheduleTimerTaskIfRequired(); + } + } + + public synchronized void finished() { + this.finished = true; + if (timer != null) { + timer.cancel(); + timer = null; + } + } + + public void recordMemoryUsage(int memoryMBs) { + this.pMemMBsStat.add(memoryMBs); + } + + private synchronized void scheduleTimerTaskIfRequired() { + if (flushPeriodMs > 0) { + // Lazily initialize timer + if (timer == null) { + this.timer = new Timer("Metrics flush checker", true); + } + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + synchronized (ContainerMetrics.this) { + if (!finished) { + flushOnPeriod = true; + } + } + } + }; + timer.schedule(timerTask, flushPeriodMs); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 02a63acfb1a..0ae432515b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -51,6 +51,8 @@ public class ContainersMonitorImpl extends AbstractService implements private long monitoringInterval; private MonitoringThread monitoringThread; + private boolean containerMetricsEnabled; + private long containerMetricsPeriodMs; final List containersToBeRemoved; final Map containersToBeAdded; @@ -106,6 +108,13 @@ public class ContainersMonitorImpl extends AbstractService implements LOG.info(" Using ResourceCalculatorProcessTree : " + this.processTreeClass); + this.containerMetricsEnabled = + conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE); + this.containerMetricsPeriodMs = + conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); + long configuredPMemForContainers = conf.getLong( YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l; @@ -352,6 +361,9 @@ public class ContainersMonitorImpl extends AbstractService implements // Remove finished containers synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { + if (containerMetricsEnabled) { + ContainerMetrics.forContainer(containerId).finished(); + } trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); } @@ -408,7 +420,15 @@ public class ContainersMonitorImpl extends AbstractService implements LOG.info(String.format( "Memory usage of ProcessTree %s for container-id %s: ", pId, containerId.toString()) + - formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + formatUsageString( + currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + + // Add usage to container metrics + if (containerMetricsEnabled) { + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).recordMemoryUsage( + (int) (currentPmemUsage >> 20)); + } boolean isMemoryOverLimit = false; String msg = ""; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java new file mode 100644 index 00000000000..158e2a88ac5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.util.Timer; + +public class TestContainerMetrics { + + @Test + public void testContainerMetricsFlow() throws InterruptedException { + final String ERR = "Error in number of records"; + + // Create a dummy MetricsSystem + MetricsSystem system = mock(MetricsSystem.class); + doReturn(this).when(system).register(anyString(), anyString(), any()); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ContainerId containerId = mock(ContainerId.class); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + + metrics.recordMemoryUsage(1024); + metrics.getMetrics(collector, true); + assertEquals(ERR, 0, collector.getRecords().size()); + + Thread.sleep(110); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + collector.clear(); + + Thread.sleep(110); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + collector.clear(); + + metrics.finished(); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + collector.clear(); + + metrics.getMetrics(collector, true); + assertEquals(ERR, 0, collector.getRecords().size()); + + Thread.sleep(110); + metrics.getMetrics(collector, true); + assertEquals(ERR, 0, collector.getRecords().size()); + } +}