From fdf02d1f26cea372bf69e071f57b8bfc09c092c4 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 2 Oct 2015 20:09:13 +0000 Subject: [PATCH] YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException. Contributed by Zhihai Xu --- .../metrics2/impl/MetricsSystemImpl.java | 3 +- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 10 ++++- .../src/main/resources/yarn-default.xml | 8 ++++ .../monitor/ContainerMetrics.java | 38 ++++++++++++---- .../monitor/ContainersMonitorImpl.java | 16 +++++-- .../monitor/TestContainerMetrics.java | 45 ++++++++++++++++++- 7 files changed, 107 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index 513d6d71fd2..ef7306b747e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -395,7 +395,8 @@ public synchronized void publishMetricsNow() { * Sample all the sources for a snapshot of metrics/tags * @return the metrics buffer containing the snapshot */ - synchronized MetricsBuffer sampleMetrics() { + @VisibleForTesting + public synchronized MetricsBuffer sampleMetrics() { collector.clear(); MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder(); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bf3ac12aae7..9e9522c4ee2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1008,6 +1008,9 @@ Release 2.7.2 - UNRELEASED YARN-3727. For better error recovery, check if the directory exists before using it for localization. (Zhihai Xu via jlowe) + YARN-3619. ContainerMetrics unregisters during getMetrics and leads to + ConcurrentModificationException (Zhihai Xu via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f0c49..d2106cd7888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1008,7 +1008,15 @@ private static void addDeprecatedKeys() { NM_PREFIX + "container-metrics.period-ms"; @Private public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; - + + /** The delay time ms to unregister container metrics after completion. */ + @Private + public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + NM_PREFIX + "container-metrics.unregister-delay-ms"; + @Private + public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + 10000; + /** Prefix for all node manager disk health checker configs. */ private static final String NM_DISK_HEALTH_CHECK_PREFIX = "yarn.nodemanager.disk-health-checker."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3e124..c8bca8ed728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1571,6 +1571,14 @@ -1 + + + The delay time ms to unregister container metrics after completion. + + yarn.nodemanager.container-metrics.unregister-delay-ms + 10000 + + Class used to calculate current container resource utilization. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index c364143241c..48128c16878 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -100,6 +100,7 @@ public class ContainerMetrics implements MetricsSource { private boolean flushOnPeriod = false; // true if period elapsed private boolean finished = false; // true if container finished private boolean unregister = false; // unregister + private long unregisterDelayMs; private Timer timer; // lazily initialized /** @@ -107,15 +108,21 @@ public class ContainerMetrics implements MetricsSource { */ protected final static Map usageMetrics = new HashMap<>(); + // Create a timer to unregister container metrics, + // whose associated thread run as a daemon. + private final static Timer unregisterContainerMetricsTimer = + new Timer("Container metrics unregistration", true); ContainerMetrics( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { this.recordInfo = info(sourceName(containerId), RECORD_INFO.description()); this.registry = new MetricsRegistry(recordInfo); this.metricsSystem = ms; this.containerId = containerId; this.flushPeriodMs = flushPeriodMs; + this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs; scheduleTimerTaskIfRequired(); this.pMemMBsStat = registry.newStat( @@ -148,17 +155,18 @@ static String sourceName(ContainerId containerId) { } public static ContainerMetrics forContainer( - ContainerId containerId, long flushPeriodMs) { + ContainerId containerId, long flushPeriodMs, long delayMs) { return forContainer( - DefaultMetricsSystem.instance(), containerId, flushPeriodMs); + DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs); } synchronized static ContainerMetrics forContainer( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { ContainerMetrics metrics = usageMetrics.get(containerId); if (metrics == null) { - metrics = new ContainerMetrics( - ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); + metrics = new ContainerMetrics(ms, containerId, flushPeriodMs, + delayMs).tag(RECORD_INFO, containerId); // Register with the MetricsSystems if (ms != null) { @@ -172,12 +180,15 @@ synchronized static ContainerMetrics forContainer( return metrics; } + synchronized static void unregisterContainerMetrics(ContainerMetrics cm) { + cm.metricsSystem.unregisterSource(cm.recordInfo.name()); + usageMetrics.remove(cm.containerId); + } + @Override public synchronized void getMetrics(MetricsCollector collector, boolean all) { //Container goes through registered -> finished -> unregistered. if (unregister) { - metricsSystem.unregisterSource(recordInfo.name()); - usageMetrics.remove(containerId); return; } @@ -199,6 +210,7 @@ public synchronized void finished() { timer.cancel(); timer = null; } + scheduleTimerTaskForUnregistration(); } public void recordMemoryUsage(int memoryMBs) { @@ -252,4 +264,14 @@ public void run() { timer.schedule(timerTask, flushPeriodMs); } } + + private void scheduleTimerTaskForUnregistration() { + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this); + } + }; + unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b3839d2aa10..82ad53ed576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -55,6 +55,7 @@ public class ContainersMonitorImpl extends AbstractService implements private MonitoringThread monitoringThread; private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; + private long containerMetricsUnregisterDelayMs; @VisibleForTesting final Map trackingContainers = @@ -126,6 +127,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.containerMetricsPeriodMs = conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); + this.containerMetricsUnregisterDelayMs = conf.getLong( + YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); long configuredPMemForContainers = NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L; @@ -425,7 +429,8 @@ public void run() { if (containerMetricsEnabled) { ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); usageMetrics.recordProcessId(pId); } } @@ -476,10 +481,12 @@ public void run() { // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordMemoryUsage( + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordMemoryUsage( (int) (currentPmemUsage >> 20)); ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordCpuUsage + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } @@ -609,7 +616,8 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) { ContainerId containerId = monitoringEvent.getContainerId(); ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); int vmemLimitMBs; int pmemLimitMBs; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index bdf99943568..2beb927e896 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -44,7 +48,8 @@ public void testContainerMetricsFlow() throws InterruptedException { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); metrics.recordMemoryUsage(1024); metrics.getMetrics(collector, true); @@ -82,7 +87,8 @@ public void testContainerMetricsLimit() throws InterruptedException { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); int anyPmemLimit = 1024; int anyVmemLimit = 2048; @@ -117,4 +123,39 @@ public void testContainerMetricsLimit() throws InterruptedException { collector.clear(); } + + @Test + public void testContainerMetricsFinished() throws InterruptedException { + MetricsSystemImpl system = new MetricsSystemImpl(); + system.init("test"); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerMetrics metrics1 = ContainerMetrics.forContainer(system, + containerId1, 1, 0); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerMetrics metrics2 = ContainerMetrics.forContainer(system, + containerId2, 1, 0); + ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); + ContainerMetrics metrics3 = ContainerMetrics.forContainer(system, + containerId3, 1, 0); + metrics1.finished(); + metrics2.finished(); + system.sampleMetrics(); + system.sampleMetrics(); + Thread.sleep(100); + system.stop(); + // verify metrics1 is unregistered + assertTrue(metrics1 != ContainerMetrics.forContainer( + system, containerId1, 1, 0)); + // verify metrics2 is unregistered + assertTrue(metrics2 != ContainerMetrics.forContainer( + system, containerId2, 1, 0)); + // verify metrics3 is still registered + assertTrue(metrics3 == ContainerMetrics.forContainer( + system, containerId3, 1, 0)); + system.shutdown(); + } }