HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks. Contributed by Chuan Liu.

This commit is contained in:
cnauroth 2014-09-18 15:36:43 -07:00
parent 2c3da25fd7
commit 1942364ef1
1 changed files with 28 additions and 7 deletions

View File

@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
private final Map<String, MetricsSource> allSources;
private final Map<String, MetricsSinkAdapter> sinks;
private final Map<String, MetricsSink> allSinks;
// The callback list is used by register(Callback callback), while
// the callback map is used by register(String name, String desc, T sink)
private final List<Callback> callbacks;
private final Map<String, Callback> namedCallbacks;
private final MetricsCollectorImpl collector;
private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
@Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
sourceConfigs = Maps.newHashMap();
sinkConfigs = Maps.newHashMap();
callbacks = Lists.newArrayList();
namedCallbacks = Maps.newHashMap();
injectedTags = Lists.newArrayList();
collector = new MetricsCollectorImpl();
if (prefix != null) {
@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
return;
}
for (Callback cb : callbacks) cb.preStart();
for (Callback cb : namedCallbacks.values()) cb.preStart();
configure(prefix);
startTimer();
monitoring = true;
LOG.info(prefix +" metrics system started");
for (Callback cb : callbacks) cb.postStart();
for (Callback cb : namedCallbacks.values()) cb.postStart();
}
@Override
@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
return;
}
for (Callback cb : callbacks) cb.preStop();
for (Callback cb : namedCallbacks.values()) cb.preStop();
LOG.info("Stopping "+ prefix +" metrics system...");
stopTimer();
stopSources();
@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
monitoring = false;
LOG.info(prefix +" metrics system stopped.");
for (Callback cb : callbacks) cb.postStop();
for (Callback cb : namedCallbacks.values()) cb.postStop();
}
@Override public synchronized <T>
@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
}
// We want to re-register the source to pick up new config when the
// metrics system restarts.
register(new AbstractCallback() {
register(name, new AbstractCallback() {
@Override public void postStart() {
registerSource(finalName, finalDesc, s);
}
@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
if (allSources.containsKey(name)) {
allSources.remove(name);
}
if (namedCallbacks.containsKey(name)) {
namedCallbacks.remove(name);
}
}
synchronized
@ -268,7 +281,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
}
// We want to re-register the sink to pick up new config
// when the metrics system restarts.
register(new AbstractCallback() {
register(name, new AbstractCallback() {
@Override public void postStart() {
register(name, description, sink);
}
@ -289,9 +302,16 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
@Override
public synchronized void register(final Callback callback) {
callbacks.add((Callback) Proxy.newProxyInstance(
callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
new InvocationHandler() {
callbacks.add((Callback) getProxyForCallback(callback));
}
private synchronized void register(String name, final Callback callback) {
namedCallbacks.put(name, (Callback) getProxyForCallback(callback));
}
private Object getProxyForCallback(final Callback callback) {
return Proxy.newProxyInstance(callback.getClass().getClassLoader(),
new Class<?>[] { Callback.class }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
@ -299,11 +319,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
return method.invoke(callback, args);
} catch (Exception e) {
// These are not considered fatal.
LOG.warn("Caught exception in callback "+ method.getName(), e);
LOG.warn("Caught exception in callback " + method.getName(), e);
}
return null;
}
}));
});
}
@Override
@ -572,6 +592,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
allSources.clear();
allSinks.clear();
callbacks.clear();
namedCallbacks.clear();
if (mbeanName != null) {
MBeans.unregister(mbeanName);
mbeanName = null;