diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 79562aee9e7..d9dbdbd5c06 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -236,8 +236,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayTask task = new DecayTask(this, timer); timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis); - metricsProxy = MetricsProxy.getInstance(ns, numLevels); - metricsProxy.setDelegate(this); + metricsProxy = MetricsProxy.getInstance(ns, numLevels, this); + recomputeScheduleCache(); } // Load configs @@ -680,21 +680,26 @@ public class DecayRpcScheduler implements RpcScheduler, private long[] callCountInLastWindowDefault; private ObjectName decayRpcSchedulerInfoBeanName; - private MetricsProxy(String namespace, int numLevels) { + private MetricsProxy(String namespace, int numLevels, + DecayRpcScheduler drs) { averageResponseTimeDefault = new double[numLevels]; callCountInLastWindowDefault = new long[numLevels]; + setDelegate(drs); decayRpcSchedulerInfoBeanName = MBeans.register(namespace, "DecayRpcScheduler", this); this.registerMetrics2Source(namespace); } public static synchronized MetricsProxy getInstance(String namespace, - int numLevels) { + int numLevels, DecayRpcScheduler drs) { MetricsProxy mp = INSTANCES.get(namespace); if (mp == null) { // We must create one - mp = new MetricsProxy(namespace, numLevels); + mp = new MetricsProxy(namespace, numLevels, drs); INSTANCES.put(namespace, mp); + } else if (drs != mp.delegate.get()){ + // in case of delegate is reclaimed, we should set it again + mp.setDelegate(drs); } return mp; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 58380c54106..10ab40ace1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -19,19 +19,22 @@ package org.apache.hadoop.ipc; import static java.lang.Thread.sleep; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import org.junit.Test; + +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; import javax.management.MBeanServer; import javax.management.ObjectName; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.lang.management.ManagementFactory; public class TestDecayRpcScheduler { @@ -248,4 +251,27 @@ public class TestDecayRpcScheduler { sleep(10); } } + + @Test(timeout=60000) + public void testNPEatInitialization() throws InterruptedException { + // redirect the LOG to and check if there is NPE message while initializing + // the DecayRpcScheduler + PrintStream output = System.out; + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bytes)); + + // initializing DefaultMetricsSystem here would set "monitoring" flag in + // MetricsSystemImpl to true + DefaultMetricsSystem.initialize("NameNode"); + Configuration conf = new Configuration(); + scheduler = new DecayRpcScheduler(1, "ns", conf); + // check if there is npe in log + assertFalse(bytes.toString().contains("NullPointerException")); + } finally { + //set systout back + System.setOut(output); + } + + } } \ No newline at end of file