From 1942364ef14396e9bd94a87c0d901ff9abe1d42a Mon Sep 17 00:00:00 2001 From: cnauroth Date: Thu, 18 Sep 2014 15:36:43 -0700 Subject: [PATCH] HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks. Contributed by Chuan Liu. --- .../metrics2/impl/MetricsSystemImpl.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index 722abd95c4a..2107e68895b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { private final Map allSources; private final Map sinks; private final Map allSinks; + + // The callback list is used by register(Callback callback), while + // the callback map is used by register(String name, String desc, T sink) private final List callbacks; + private final Map namedCallbacks; + private final MetricsCollectorImpl collector; private final MetricsRegistry registry = new MetricsRegistry(MS_NAME); @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat; @@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { sourceConfigs = Maps.newHashMap(); sinkConfigs = Maps.newHashMap(); callbacks = Lists.newArrayList(); + namedCallbacks = Maps.newHashMap(); injectedTags = Lists.newArrayList(); collector = new MetricsCollectorImpl(); if (prefix != null) { @@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return; } for (Callback cb : callbacks) cb.preStart(); + for (Callback cb : namedCallbacks.values()) cb.preStart(); configure(prefix); startTimer(); monitoring = true; LOG.info(prefix +" metrics system started"); for (Callback cb : callbacks) cb.postStart(); + for (Callback cb : namedCallbacks.values()) cb.postStart(); } @Override @@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return; } for (Callback cb : callbacks) cb.preStop(); + for (Callback cb : namedCallbacks.values()) cb.preStop(); LOG.info("Stopping "+ prefix +" metrics system..."); stopTimer(); stopSources(); @@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { monitoring = false; LOG.info(prefix +" metrics system stopped."); for (Callback cb : callbacks) cb.postStop(); + for (Callback cb : namedCallbacks.values()) cb.postStop(); } @Override public synchronized @@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { } // We want to re-register the source to pick up new config when the // metrics system restarts. - register(new AbstractCallback() { + register(name, new AbstractCallback() { @Override public void postStart() { registerSource(finalName, finalDesc, s); } @@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { if (allSources.containsKey(name)) { allSources.remove(name); } + if (namedCallbacks.containsKey(name)) { + namedCallbacks.remove(name); + } } synchronized @@ -268,7 +281,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { } // We want to re-register the sink to pick up new config // when the metrics system restarts. - register(new AbstractCallback() { + register(name, new AbstractCallback() { @Override public void postStart() { register(name, description, sink); } @@ -289,9 +302,16 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { @Override public synchronized void register(final Callback callback) { - callbacks.add((Callback) Proxy.newProxyInstance( - callback.getClass().getClassLoader(), new Class[] { Callback.class }, - new InvocationHandler() { + callbacks.add((Callback) getProxyForCallback(callback)); + } + + private synchronized void register(String name, final Callback callback) { + namedCallbacks.put(name, (Callback) getProxyForCallback(callback)); + } + + private Object getProxyForCallback(final Callback callback) { + return Proxy.newProxyInstance(callback.getClass().getClassLoader(), + new Class[] { Callback.class }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -299,11 +319,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return method.invoke(callback, args); } catch (Exception e) { // These are not considered fatal. - LOG.warn("Caught exception in callback "+ method.getName(), e); + LOG.warn("Caught exception in callback " + method.getName(), e); } return null; } - })); + }); } @Override @@ -572,6 +592,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { allSources.clear(); allSinks.clear(); callbacks.clear(); + namedCallbacks.clear(); if (mbeanName != null) { MBeans.unregister(mbeanName); mbeanName = null;