YARN-5190. Registering/unregistering container metrics in ContainerMonitorImpl and ContainerImpl causing uncaught exception in ContainerMonitorImpl. Contributed by Junping Du
This commit is contained in:
parent
097baaaeba
commit
99cc439e29
|
@ -255,6 +255,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
|
|||
if (namedCallbacks.containsKey(name)) {
|
||||
namedCallbacks.remove(name);
|
||||
}
|
||||
DefaultMetricsSystem.removeSourceName(name);
|
||||
}
|
||||
|
||||
synchronized
|
||||
|
|
|
@ -115,6 +115,11 @@ public enum DefaultMetricsSystem {
|
|||
INSTANCE.removeObjectName(name.toString());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static void removeSourceName(String name) {
|
||||
INSTANCE.removeSource(name);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static String sourceName(String name, boolean dupOK) {
|
||||
return INSTANCE.newSourceName(name, dupOK);
|
||||
|
@ -135,6 +140,10 @@ public enum DefaultMetricsSystem {
|
|||
mBeanNames.map.remove(name);
|
||||
}
|
||||
|
||||
synchronized void removeSource(String name) {
|
||||
sourceNames.map.remove(name);
|
||||
}
|
||||
|
||||
synchronized String newSourceName(String name, boolean dupOK) {
|
||||
if (sourceNames.map.containsKey(name)) {
|
||||
if (dupOK) {
|
||||
|
|
|
@ -198,6 +198,12 @@ public class ContainerMetrics implements MetricsSource {
|
|||
DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
|
||||
}
|
||||
|
||||
public synchronized static ContainerMetrics getContainerMetrics(
|
||||
ContainerId containerId) {
|
||||
// could be null
|
||||
return usageMetrics.get(containerId);
|
||||
}
|
||||
|
||||
synchronized static ContainerMetrics forContainer(
|
||||
MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
|
||||
long delayMs) {
|
||||
|
@ -237,12 +243,14 @@ public class ContainerMetrics implements MetricsSource {
|
|||
}
|
||||
|
||||
public synchronized void finished() {
|
||||
this.finished = true;
|
||||
if (timer != null) {
|
||||
timer.cancel();
|
||||
timer = null;
|
||||
if (!finished) {
|
||||
this.finished = true;
|
||||
if (timer != null) {
|
||||
timer.cancel();
|
||||
timer = null;
|
||||
}
|
||||
scheduleTimerTaskForUnregistration();
|
||||
}
|
||||
scheduleTimerTaskForUnregistration();
|
||||
}
|
||||
|
||||
public void recordMemoryUsage(int memoryMBs) {
|
||||
|
|
|
@ -619,15 +619,16 @@ public class ContainersMonitorImpl extends AbstractService implements
|
|||
}
|
||||
|
||||
ContainerId containerId = monitoringEvent.getContainerId();
|
||||
ContainerMetrics usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs);
|
||||
ContainerMetrics usageMetrics;
|
||||
|
||||
int vmemLimitMBs;
|
||||
int pmemLimitMBs;
|
||||
int cpuVcores;
|
||||
switch (monitoringEvent.getType()) {
|
||||
case START_MONITORING_CONTAINER:
|
||||
usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs);
|
||||
ContainerStartMonitoringEvent startEvent =
|
||||
(ContainerStartMonitoringEvent) monitoringEvent;
|
||||
usageMetrics.recordStateChangeDurations(
|
||||
|
@ -640,9 +641,16 @@ public class ContainersMonitorImpl extends AbstractService implements
|
|||
vmemLimitMBs, pmemLimitMBs, cpuVcores);
|
||||
break;
|
||||
case STOP_MONITORING_CONTAINER:
|
||||
usageMetrics.finished();
|
||||
usageMetrics = ContainerMetrics.getContainerMetrics(
|
||||
containerId);
|
||||
if (usageMetrics != null) {
|
||||
usageMetrics.finished();
|
||||
}
|
||||
break;
|
||||
case CHANGE_MONITORING_CONTAINER_RESOURCE:
|
||||
usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs);
|
||||
ChangeMonitoringContainerResourceEvent changeEvent =
|
||||
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
|
||||
Resource resource = changeEvent.getResource();
|
||||
|
|
|
@ -142,7 +142,6 @@ public class TestContainerMetrics {
|
|||
system.sampleMetrics();
|
||||
system.sampleMetrics();
|
||||
Thread.sleep(100);
|
||||
system.stop();
|
||||
// verify metrics1 is unregistered
|
||||
assertTrue(metrics1 != ContainerMetrics.forContainer(
|
||||
system, containerId1, 1, 0));
|
||||
|
@ -152,6 +151,9 @@ public class TestContainerMetrics {
|
|||
// verify metrics3 is still registered
|
||||
assertTrue(metrics3 == ContainerMetrics.forContainer(
|
||||
system, containerId3, 1, 0));
|
||||
// YARN-5190: move stop() to the end to verify registering containerId1 and
|
||||
// containerId2 won't get MetricsException thrown.
|
||||
system.stop();
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue