From 9f6ed41b952790880e5c6da5c767e06069a37b47 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 2 Oct 2015 20:19:14 +0000 Subject: [PATCH] YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException. Contributed by Zhihai Xu --- .../metrics2/impl/MetricsSystemImpl.java | 3 +- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 10 ++++- .../src/main/resources/yarn-default.xml | 8 ++++ .../monitor/ContainerMetrics.java | 38 ++++++++++++---- .../monitor/ContainersMonitorImpl.java | 16 +++++-- .../monitor/TestContainerMetrics.java | 45 ++++++++++++++++++- 7 files changed, 107 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index b7f264b4d26..cbdfdbd2227 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -398,7 +398,8 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { * Sample all the sources for a snapshot of metrics/tags * @return the metrics buffer containing the snapshot */ - synchronized MetricsBuffer sampleMetrics() { + @VisibleForTesting + public synchronized MetricsBuffer sampleMetrics() { collector.clear(); MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder(); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e7452c436a5..901109f80cf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.7.2 - UNRELEASED YARN-3727. For better error recovery, check if the directory exists before using it for localization. (Zhihai Xu via jlowe) + YARN-3619. ContainerMetrics unregisters during getMetrics and leads to + ConcurrentModificationException (Zhihai Xu via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6748df76b11..c029d90ab10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -882,7 +882,15 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "container-metrics.period-ms"; @Private public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; - + + /** The delay time ms to unregister container metrics after completion. */ + @Private + public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + NM_PREFIX + "container-metrics.unregister-delay-ms"; + @Private + public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + 10000; + /** Prefix for all node manager disk health checker configs. */ private static final String NM_DISK_HEALTH_CHECK_PREFIX = "yarn.nodemanager.disk-health-checker."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index de6906ed1b8..7b2be61c59e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1224,6 +1224,14 @@ ${hadoop.tmp.dir}/yarn-nm-recovery + + + The delay time ms to unregister container metrics after completion. + + yarn.nodemanager.container-metrics.unregister-delay-ms + 10000 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index ffa72a415d5..365fe841cd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -90,6 +90,7 @@ public class ContainerMetrics implements MetricsSource { private boolean flushOnPeriod = false; // true if period elapsed private boolean finished = false; // true if container finished private boolean unregister = false; // unregister + private long unregisterDelayMs; private Timer timer; // lazily initialized /** @@ -97,15 +98,21 @@ public class ContainerMetrics implements MetricsSource { */ protected final static Map usageMetrics = new HashMap<>(); + // Create a timer to unregister container metrics, + // whose associated thread run as a daemon. + private final static Timer unregisterContainerMetricsTimer = + new Timer("Container metrics unregistration", true); ContainerMetrics( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { this.recordInfo = info(sourceName(containerId), RECORD_INFO.description()); this.registry = new MetricsRegistry(recordInfo); this.metricsSystem = ms; this.containerId = containerId; this.flushPeriodMs = flushPeriodMs; + this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs; scheduleTimerTaskIfRequired(); this.pMemMBsStat = registry.newStat( @@ -134,17 +141,18 @@ public class ContainerMetrics implements MetricsSource { } public static ContainerMetrics forContainer( - ContainerId containerId, long flushPeriodMs) { + ContainerId containerId, long flushPeriodMs, long delayMs) { return forContainer( - DefaultMetricsSystem.instance(), containerId, flushPeriodMs); + DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs); } synchronized static ContainerMetrics forContainer( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { ContainerMetrics metrics = usageMetrics.get(containerId); if (metrics == null) { - metrics = new ContainerMetrics( - ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); + metrics = new ContainerMetrics(ms, containerId, flushPeriodMs, + delayMs).tag(RECORD_INFO, containerId); // Register with the MetricsSystems if (ms != null) { @@ -158,12 +166,15 @@ public class ContainerMetrics implements MetricsSource { return metrics; } + synchronized static void unregisterContainerMetrics(ContainerMetrics cm) { + cm.metricsSystem.unregisterSource(cm.recordInfo.name()); + usageMetrics.remove(cm.containerId); + } + @Override public synchronized void getMetrics(MetricsCollector collector, boolean all) { //Container goes through registered -> finished -> unregistered. if (unregister) { - metricsSystem.unregisterSource(recordInfo.name()); - usageMetrics.remove(containerId); return; } @@ -185,6 +196,7 @@ public class ContainerMetrics implements MetricsSource { timer.cancel(); timer = null; } + scheduleTimerTaskForUnregistration(); } public void recordMemoryUsage(int memoryMBs) { @@ -232,4 +244,14 @@ public class ContainerMetrics implements MetricsSource { timer.schedule(timerTask, flushPeriodMs); } } + + private void scheduleTimerTaskForUnregistration() { + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this); + } + }; + unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 51530517563..20d2112a78d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -54,6 +54,7 @@ public class ContainersMonitorImpl extends AbstractService implements private MonitoringThread monitoringThread; private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; + private long containerMetricsUnregisterDelayMs; final List containersToBeRemoved; final Map containersToBeAdded; @@ -116,6 +117,9 @@ public class ContainersMonitorImpl extends AbstractService implements this.containerMetricsPeriodMs = conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); + this.containerMetricsUnregisterDelayMs = conf.getLong( + YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); long configuredPMemForContainers = conf.getLong( YarnConfiguration.NM_PMEM_MB, @@ -379,7 +383,8 @@ public class ContainersMonitorImpl extends AbstractService implements for (ContainerId containerId : containersToBeRemoved) { if (containerMetricsEnabled) { ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).finished(); + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).finished(); } trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); @@ -417,7 +422,8 @@ public class ContainersMonitorImpl extends AbstractService implements if (containerMetricsEnabled) { ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); int cpuVcores = ptInfo.getCpuVcores(); final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); @@ -464,10 +470,12 @@ public class ContainersMonitorImpl extends AbstractService implements // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordMemoryUsage( + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordMemoryUsage( (int) (currentPmemUsage >> 20)); ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordCpuUsage + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index c6286486083..ec06856e878 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -44,7 +48,8 @@ public class TestContainerMetrics { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); metrics.recordMemoryUsage(1024); metrics.getMetrics(collector, true); @@ -82,7 +87,8 @@ public class TestContainerMetrics { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); int anyPmemLimit = 1024; int anyVmemLimit = 2048; @@ -107,4 +113,39 @@ public class TestContainerMetrics { collector.clear(); } + + @Test + public void testContainerMetricsFinished() throws InterruptedException { + MetricsSystemImpl system = new MetricsSystemImpl(); + system.init("test"); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerMetrics metrics1 = ContainerMetrics.forContainer(system, + containerId1, 1, 0); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerMetrics metrics2 = ContainerMetrics.forContainer(system, + containerId2, 1, 0); + ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); + ContainerMetrics metrics3 = ContainerMetrics.forContainer(system, + containerId3, 1, 0); + metrics1.finished(); + metrics2.finished(); + system.sampleMetrics(); + system.sampleMetrics(); + Thread.sleep(100); + system.stop(); + // verify metrics1 is unregistered + assertTrue(metrics1 != ContainerMetrics.forContainer( + system, containerId1, 1, 0)); + // verify metrics2 is unregistered + assertTrue(metrics2 != ContainerMetrics.forContainer( + system, containerId2, 1, 0)); + // verify metrics3 is still registered + assertTrue(metrics3 == ContainerMetrics.forContainer( + system, containerId3, 1, 0)); + system.shutdown(); + } }