HADOOP-15121. Encounter NullPointerException when using DecayRpcScheduler. Contributed by Tao Jie.

(cherry picked from commit 3fde0f1db5)
This commit is contained in:
Hanisha Koneru 2018-01-22 15:54:44 -08:00 committed by Jason Lowe
parent 913417bbea
commit 4c54ddd1c2
2 changed files with 39 additions and 8 deletions

View File

@ -236,8 +236,8 @@ public class DecayRpcScheduler implements RpcScheduler,
DecayTask task = new DecayTask(this, timer); DecayTask task = new DecayTask(this, timer);
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis); timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
metricsProxy = MetricsProxy.getInstance(ns, numLevels); metricsProxy = MetricsProxy.getInstance(ns, numLevels, this);
metricsProxy.setDelegate(this); recomputeScheduleCache();
} }
// Load configs // Load configs
@ -680,21 +680,26 @@ public class DecayRpcScheduler implements RpcScheduler,
private long[] callCountInLastWindowDefault; private long[] callCountInLastWindowDefault;
private ObjectName decayRpcSchedulerInfoBeanName; private ObjectName decayRpcSchedulerInfoBeanName;
private MetricsProxy(String namespace, int numLevels) { private MetricsProxy(String namespace, int numLevels,
DecayRpcScheduler drs) {
averageResponseTimeDefault = new double[numLevels]; averageResponseTimeDefault = new double[numLevels];
callCountInLastWindowDefault = new long[numLevels]; callCountInLastWindowDefault = new long[numLevels];
setDelegate(drs);
decayRpcSchedulerInfoBeanName = decayRpcSchedulerInfoBeanName =
MBeans.register(namespace, "DecayRpcScheduler", this); MBeans.register(namespace, "DecayRpcScheduler", this);
this.registerMetrics2Source(namespace); this.registerMetrics2Source(namespace);
} }
public static synchronized MetricsProxy getInstance(String namespace, public static synchronized MetricsProxy getInstance(String namespace,
int numLevels) { int numLevels, DecayRpcScheduler drs) {
MetricsProxy mp = INSTANCES.get(namespace); MetricsProxy mp = INSTANCES.get(namespace);
if (mp == null) { if (mp == null) {
// We must create one // We must create one
mp = new MetricsProxy(namespace, numLevels); mp = new MetricsProxy(namespace, numLevels, drs);
INSTANCES.put(namespace, mp); INSTANCES.put(namespace, mp);
} else if (drs != mp.delegate.get()){
// in case of delegate is reclaimed, we should set it again
mp.setDelegate(drs);
} }
return mp; return mp;
} }

View File

@ -19,19 +19,22 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
public class TestDecayRpcScheduler { public class TestDecayRpcScheduler {
@ -248,4 +251,27 @@ public class TestDecayRpcScheduler {
sleep(10); sleep(10);
} }
} }
@Test(timeout=60000)
public void testNPEatInitialization() throws InterruptedException {
// redirect the LOG to and check if there is NPE message while initializing
// the DecayRpcScheduler
PrintStream output = System.out;
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
// initializing DefaultMetricsSystem here would set "monitoring" flag in
// MetricsSystemImpl to true
DefaultMetricsSystem.initialize("NameNode");
Configuration conf = new Configuration();
scheduler = new DecayRpcScheduler(1, "ns", conf);
// check if there is npe in log
assertFalse(bytes.toString().contains("NullPointerException"));
} finally {
//set systout back
System.setOut(output);
}
}
} }