YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException. Contributed by Zhihai Xu
This commit is contained in:
parent
b2bc72a051
commit
9f6ed41b95
|
@ -398,7 +398,8 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
|
||||||
* Sample all the sources for a snapshot of metrics/tags
|
* Sample all the sources for a snapshot of metrics/tags
|
||||||
* @return the metrics buffer containing the snapshot
|
* @return the metrics buffer containing the snapshot
|
||||||
*/
|
*/
|
||||||
synchronized MetricsBuffer sampleMetrics() {
|
@VisibleForTesting
|
||||||
|
public synchronized MetricsBuffer sampleMetrics() {
|
||||||
collector.clear();
|
collector.clear();
|
||||||
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
|
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
|
||||||
|
|
||||||
|
|
|
@ -91,6 +91,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3727. For better error recovery, check if the directory exists before
|
YARN-3727. For better error recovery, check if the directory exists before
|
||||||
using it for localization. (Zhihai Xu via jlowe)
|
using it for localization. (Zhihai Xu via jlowe)
|
||||||
|
|
||||||
|
YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
|
||||||
|
ConcurrentModificationException (Zhihai Xu via jlowe)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -882,7 +882,15 @@ public class YarnConfiguration extends Configuration {
|
||||||
NM_PREFIX + "container-metrics.period-ms";
|
NM_PREFIX + "container-metrics.period-ms";
|
||||||
@Private
|
@Private
|
||||||
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
|
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
|
||||||
|
|
||||||
|
/** The delay time ms to unregister container metrics after completion. */
|
||||||
|
@Private
|
||||||
|
public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
|
||||||
|
NM_PREFIX + "container-metrics.unregister-delay-ms";
|
||||||
|
@Private
|
||||||
|
public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
|
||||||
|
10000;
|
||||||
|
|
||||||
/** Prefix for all node manager disk health checker configs. */
|
/** Prefix for all node manager disk health checker configs. */
|
||||||
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
|
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
|
||||||
"yarn.nodemanager.disk-health-checker.";
|
"yarn.nodemanager.disk-health-checker.";
|
||||||
|
|
|
@ -1224,6 +1224,14 @@
|
||||||
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
|
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The delay time ms to unregister container metrics after completion.
|
||||||
|
</description>
|
||||||
|
<name>yarn.nodemanager.container-metrics.unregister-delay-ms</name>
|
||||||
|
<value>10000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!--Docker configuration-->
|
<!--Docker configuration-->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
private boolean flushOnPeriod = false; // true if period elapsed
|
private boolean flushOnPeriod = false; // true if period elapsed
|
||||||
private boolean finished = false; // true if container finished
|
private boolean finished = false; // true if container finished
|
||||||
private boolean unregister = false; // unregister
|
private boolean unregister = false; // unregister
|
||||||
|
private long unregisterDelayMs;
|
||||||
private Timer timer; // lazily initialized
|
private Timer timer; // lazily initialized
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,15 +98,21 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
*/
|
*/
|
||||||
protected final static Map<ContainerId, ContainerMetrics>
|
protected final static Map<ContainerId, ContainerMetrics>
|
||||||
usageMetrics = new HashMap<>();
|
usageMetrics = new HashMap<>();
|
||||||
|
// Create a timer to unregister container metrics,
|
||||||
|
// whose associated thread run as a daemon.
|
||||||
|
private final static Timer unregisterContainerMetricsTimer =
|
||||||
|
new Timer("Container metrics unregistration", true);
|
||||||
|
|
||||||
ContainerMetrics(
|
ContainerMetrics(
|
||||||
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
|
MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
|
||||||
|
long delayMs) {
|
||||||
this.recordInfo =
|
this.recordInfo =
|
||||||
info(sourceName(containerId), RECORD_INFO.description());
|
info(sourceName(containerId), RECORD_INFO.description());
|
||||||
this.registry = new MetricsRegistry(recordInfo);
|
this.registry = new MetricsRegistry(recordInfo);
|
||||||
this.metricsSystem = ms;
|
this.metricsSystem = ms;
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.flushPeriodMs = flushPeriodMs;
|
this.flushPeriodMs = flushPeriodMs;
|
||||||
|
this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
|
||||||
scheduleTimerTaskIfRequired();
|
scheduleTimerTaskIfRequired();
|
||||||
|
|
||||||
this.pMemMBsStat = registry.newStat(
|
this.pMemMBsStat = registry.newStat(
|
||||||
|
@ -134,17 +141,18 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerMetrics forContainer(
|
public static ContainerMetrics forContainer(
|
||||||
ContainerId containerId, long flushPeriodMs) {
|
ContainerId containerId, long flushPeriodMs, long delayMs) {
|
||||||
return forContainer(
|
return forContainer(
|
||||||
DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
|
DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized static ContainerMetrics forContainer(
|
synchronized static ContainerMetrics forContainer(
|
||||||
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
|
MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
|
||||||
|
long delayMs) {
|
||||||
ContainerMetrics metrics = usageMetrics.get(containerId);
|
ContainerMetrics metrics = usageMetrics.get(containerId);
|
||||||
if (metrics == null) {
|
if (metrics == null) {
|
||||||
metrics = new ContainerMetrics(
|
metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
|
||||||
ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
|
delayMs).tag(RECORD_INFO, containerId);
|
||||||
|
|
||||||
// Register with the MetricsSystems
|
// Register with the MetricsSystems
|
||||||
if (ms != null) {
|
if (ms != null) {
|
||||||
|
@ -158,12 +166,15 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
|
||||||
|
cm.metricsSystem.unregisterSource(cm.recordInfo.name());
|
||||||
|
usageMetrics.remove(cm.containerId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
//Container goes through registered -> finished -> unregistered.
|
//Container goes through registered -> finished -> unregistered.
|
||||||
if (unregister) {
|
if (unregister) {
|
||||||
metricsSystem.unregisterSource(recordInfo.name());
|
|
||||||
usageMetrics.remove(containerId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +196,7 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
timer.cancel();
|
timer.cancel();
|
||||||
timer = null;
|
timer = null;
|
||||||
}
|
}
|
||||||
|
scheduleTimerTaskForUnregistration();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void recordMemoryUsage(int memoryMBs) {
|
public void recordMemoryUsage(int memoryMBs) {
|
||||||
|
@ -232,4 +244,14 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
timer.schedule(timerTask, flushPeriodMs);
|
timer.schedule(timerTask, flushPeriodMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void scheduleTimerTaskForUnregistration() {
|
||||||
|
TimerTask timerTask = new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
private MonitoringThread monitoringThread;
|
private MonitoringThread monitoringThread;
|
||||||
private boolean containerMetricsEnabled;
|
private boolean containerMetricsEnabled;
|
||||||
private long containerMetricsPeriodMs;
|
private long containerMetricsPeriodMs;
|
||||||
|
private long containerMetricsUnregisterDelayMs;
|
||||||
|
|
||||||
final List<ContainerId> containersToBeRemoved;
|
final List<ContainerId> containersToBeRemoved;
|
||||||
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
|
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
|
||||||
|
@ -116,6 +117,9 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
this.containerMetricsPeriodMs =
|
this.containerMetricsPeriodMs =
|
||||||
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
|
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
|
||||||
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
|
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
|
||||||
|
this.containerMetricsUnregisterDelayMs = conf.getLong(
|
||||||
|
YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
|
||||||
|
|
||||||
long configuredPMemForContainers = conf.getLong(
|
long configuredPMemForContainers = conf.getLong(
|
||||||
YarnConfiguration.NM_PMEM_MB,
|
YarnConfiguration.NM_PMEM_MB,
|
||||||
|
@ -379,7 +383,8 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
for (ContainerId containerId : containersToBeRemoved) {
|
for (ContainerId containerId : containersToBeRemoved) {
|
||||||
if (containerMetricsEnabled) {
|
if (containerMetricsEnabled) {
|
||||||
ContainerMetrics.forContainer(
|
ContainerMetrics.forContainer(
|
||||||
containerId, containerMetricsPeriodMs).finished();
|
containerId, containerMetricsPeriodMs,
|
||||||
|
containerMetricsUnregisterDelayMs).finished();
|
||||||
}
|
}
|
||||||
trackingContainers.remove(containerId);
|
trackingContainers.remove(containerId);
|
||||||
LOG.info("Stopping resource-monitoring for " + containerId);
|
LOG.info("Stopping resource-monitoring for " + containerId);
|
||||||
|
@ -417,7 +422,8 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
|
|
||||||
if (containerMetricsEnabled) {
|
if (containerMetricsEnabled) {
|
||||||
ContainerMetrics usageMetrics = ContainerMetrics
|
ContainerMetrics usageMetrics = ContainerMetrics
|
||||||
.forContainer(containerId, containerMetricsPeriodMs);
|
.forContainer(containerId, containerMetricsPeriodMs,
|
||||||
|
containerMetricsUnregisterDelayMs);
|
||||||
int cpuVcores = ptInfo.getCpuVcores();
|
int cpuVcores = ptInfo.getCpuVcores();
|
||||||
final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
|
final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
|
||||||
final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
|
final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
|
||||||
|
@ -464,10 +470,12 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
// Add usage to container metrics
|
// Add usage to container metrics
|
||||||
if (containerMetricsEnabled) {
|
if (containerMetricsEnabled) {
|
||||||
ContainerMetrics.forContainer(
|
ContainerMetrics.forContainer(
|
||||||
containerId, containerMetricsPeriodMs).recordMemoryUsage(
|
containerId, containerMetricsPeriodMs,
|
||||||
|
containerMetricsUnregisterDelayMs).recordMemoryUsage(
|
||||||
(int) (currentPmemUsage >> 20));
|
(int) (currentPmemUsage >> 20));
|
||||||
ContainerMetrics.forContainer(
|
ContainerMetrics.forContainer(
|
||||||
containerId, containerMetricsPeriodMs).recordCpuUsage
|
containerId, containerMetricsPeriodMs,
|
||||||
|
containerMetricsUnregisterDelayMs).recordCpuUsage
|
||||||
((int)cpuUsagePercentPerCore, milliVcoresUsed);
|
((int)cpuUsagePercentPerCore, milliVcoresUsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
@ -44,7 +48,8 @@ public class TestContainerMetrics {
|
||||||
|
|
||||||
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
ContainerId containerId = mock(ContainerId.class);
|
ContainerId containerId = mock(ContainerId.class);
|
||||||
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
|
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
|
||||||
|
100, 1);
|
||||||
|
|
||||||
metrics.recordMemoryUsage(1024);
|
metrics.recordMemoryUsage(1024);
|
||||||
metrics.getMetrics(collector, true);
|
metrics.getMetrics(collector, true);
|
||||||
|
@ -82,7 +87,8 @@ public class TestContainerMetrics {
|
||||||
|
|
||||||
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
ContainerId containerId = mock(ContainerId.class);
|
ContainerId containerId = mock(ContainerId.class);
|
||||||
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
|
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
|
||||||
|
100, 1);
|
||||||
|
|
||||||
int anyPmemLimit = 1024;
|
int anyPmemLimit = 1024;
|
||||||
int anyVmemLimit = 2048;
|
int anyVmemLimit = 2048;
|
||||||
|
@ -107,4 +113,39 @@ public class TestContainerMetrics {
|
||||||
|
|
||||||
collector.clear();
|
collector.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerMetricsFinished() throws InterruptedException {
|
||||||
|
MetricsSystemImpl system = new MetricsSystemImpl();
|
||||||
|
system.init("test");
|
||||||
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 4);
|
||||||
|
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||||
|
ContainerMetrics metrics1 = ContainerMetrics.forContainer(system,
|
||||||
|
containerId1, 1, 0);
|
||||||
|
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
|
||||||
|
ContainerMetrics metrics2 = ContainerMetrics.forContainer(system,
|
||||||
|
containerId2, 1, 0);
|
||||||
|
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
|
||||||
|
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
|
||||||
|
containerId3, 1, 0);
|
||||||
|
metrics1.finished();
|
||||||
|
metrics2.finished();
|
||||||
|
system.sampleMetrics();
|
||||||
|
system.sampleMetrics();
|
||||||
|
Thread.sleep(100);
|
||||||
|
system.stop();
|
||||||
|
// verify metrics1 is unregistered
|
||||||
|
assertTrue(metrics1 != ContainerMetrics.forContainer(
|
||||||
|
system, containerId1, 1, 0));
|
||||||
|
// verify metrics2 is unregistered
|
||||||
|
assertTrue(metrics2 != ContainerMetrics.forContainer(
|
||||||
|
system, containerId2, 1, 0));
|
||||||
|
// verify metrics3 is still registered
|
||||||
|
assertTrue(metrics3 == ContainerMetrics.forContainer(
|
||||||
|
system, containerId3, 1, 0));
|
||||||
|
system.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue