YARN-5190. Registering/unregistering container metrics in ContainerMonitorImpl and ContainerImpl causing uncaught exception in ContainerMonitorImpl. Contributed by Junping Du

(cherry picked from commit 99cc439e29)
This commit is contained in:
Jian He 2016-06-03 11:10:42 -07:00
parent 01a3f7899c
commit 3c2bd19fa5
5 changed files with 38 additions and 10 deletions

View File

@ -255,6 +255,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
if (namedCallbacks.containsKey(name)) { if (namedCallbacks.containsKey(name)) {
namedCallbacks.remove(name); namedCallbacks.remove(name);
} }
DefaultMetricsSystem.removeSourceName(name);
} }
synchronized synchronized

View File

@ -107,6 +107,11 @@ public enum DefaultMetricsSystem {
INSTANCE.removeObjectName(name.toString()); INSTANCE.removeObjectName(name.toString());
} }
@InterfaceAudience.Private
public static void removeSourceName(String name) {
INSTANCE.removeSource(name);
}
@InterfaceAudience.Private @InterfaceAudience.Private
public static String sourceName(String name, boolean dupOK) { public static String sourceName(String name, boolean dupOK) {
return INSTANCE.newSourceName(name, dupOK); return INSTANCE.newSourceName(name, dupOK);
@ -127,6 +132,10 @@ public enum DefaultMetricsSystem {
mBeanNames.map.remove(name); mBeanNames.map.remove(name);
} }
synchronized void removeSource(String name) {
sourceNames.map.remove(name);
}
synchronized String newSourceName(String name, boolean dupOK) { synchronized String newSourceName(String name, boolean dupOK) {
if (sourceNames.map.containsKey(name)) { if (sourceNames.map.containsKey(name)) {
if (dupOK) { if (dupOK) {

View File

@ -160,6 +160,12 @@ public class ContainerMetrics implements MetricsSource {
DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs); DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
} }
public synchronized static ContainerMetrics getContainerMetrics(
ContainerId containerId) {
// could be null
return usageMetrics.get(containerId);
}
synchronized static ContainerMetrics forContainer( synchronized static ContainerMetrics forContainer(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs, MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
long delayMs) { long delayMs) {
@ -205,12 +211,14 @@ public class ContainerMetrics implements MetricsSource {
} }
public synchronized void finished() { public synchronized void finished() {
this.finished = true; if (!finished) {
if (timer != null) { this.finished = true;
timer.cancel(); if (timer != null) {
timer = null; timer.cancel();
timer = null;
}
scheduleTimerTaskForUnregistration();
} }
scheduleTimerTaskForUnregistration();
} }
public void recordMemoryUsage(int memoryMBs) { public void recordMemoryUsage(int memoryMBs) {

View File

@ -615,15 +615,16 @@ public class ContainersMonitorImpl extends AbstractService implements
} }
ContainerId containerId = monitoringEvent.getContainerId(); ContainerId containerId = monitoringEvent.getContainerId();
ContainerMetrics usageMetrics = ContainerMetrics ContainerMetrics usageMetrics;
.forContainer(containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs);
int vmemLimitMBs; int vmemLimitMBs;
int pmemLimitMBs; int pmemLimitMBs;
int cpuVcores; int cpuVcores;
switch (monitoringEvent.getType()) { switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER: case START_MONITORING_CONTAINER:
usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs);
ContainerStartMonitoringEvent startEvent = ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent; (ContainerStartMonitoringEvent) monitoringEvent;
usageMetrics.recordStateChangeDurations( usageMetrics.recordStateChangeDurations(
@ -636,9 +637,16 @@ public class ContainersMonitorImpl extends AbstractService implements
vmemLimitMBs, pmemLimitMBs, cpuVcores); vmemLimitMBs, pmemLimitMBs, cpuVcores);
break; break;
case STOP_MONITORING_CONTAINER: case STOP_MONITORING_CONTAINER:
usageMetrics.finished(); usageMetrics = ContainerMetrics.getContainerMetrics(
containerId);
if (usageMetrics != null) {
usageMetrics.finished();
}
break; break;
case CHANGE_MONITORING_CONTAINER_RESOURCE: case CHANGE_MONITORING_CONTAINER_RESOURCE:
usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs);
ChangeMonitoringContainerResourceEvent changeEvent = ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent; (ChangeMonitoringContainerResourceEvent) monitoringEvent;
Resource resource = changeEvent.getResource(); Resource resource = changeEvent.getResource();

View File

@ -146,7 +146,6 @@ public class TestContainerMetrics {
system.sampleMetrics(); system.sampleMetrics();
system.sampleMetrics(); system.sampleMetrics();
Thread.sleep(100); Thread.sleep(100);
system.stop();
// verify metrics1 is unregistered // verify metrics1 is unregistered
assertTrue(metrics1 != ContainerMetrics.forContainer( assertTrue(metrics1 != ContainerMetrics.forContainer(
system, containerId1, 1, 0)); system, containerId1, 1, 0));
@ -156,6 +155,9 @@ public class TestContainerMetrics {
// verify metrics3 is still registered // verify metrics3 is still registered
assertTrue(metrics3 == ContainerMetrics.forContainer( assertTrue(metrics3 == ContainerMetrics.forContainer(
system, containerId3, 1, 0)); system, containerId3, 1, 0));
// YARN-5190: move stop() to the end to verify registering containerId1 and
// containerId2 won't get MetricsException thrown.
system.stop();
system.shutdown(); system.shutdown();
} }
} }