HADOOP-13285. DecayRpcScheduler MXBean should only report decayed CallVolumeSummary. Contributed by Xiaoyu Yao.

(cherry picked from commit 0761379fe4)
This commit is contained in:
Xiaoyu Yao 2016-06-17 15:25:14 -07:00
parent 69217ba813
commit cc49133281
2 changed files with 41 additions and 6 deletions

View File

@ -901,9 +901,24 @@ public class DecayRpcScheduler implements RpcScheduler,
public String getCallVolumeSummary() { public String getCallVolumeSummary() {
try { try {
ObjectMapper om = new ObjectMapper(); ObjectMapper om = new ObjectMapper();
return om.writeValueAsString(callCounts); return om.writeValueAsString(getDecayedCallCounts());
} catch (Exception e) { } catch (Exception e) {
return "Error: " + e.getMessage(); return "Error: " + e.getMessage();
} }
} }
private Map<Object, Long> getDecayedCallCounts() {
Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, List<AtomicLong>> entry = it.next();
Object user = entry.getKey();
Long decayedCount = entry.getValue().get(0).get();
if (decayedCount > 0) {
decayedCallCounts.put(user, decayedCount);
}
}
return decayedCallCounts;
}
} }

View File

@ -30,6 +30,10 @@ import static org.mockito.Mockito.when;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
public class TestDecayRpcScheduler { public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) { private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class); Schedulable mockCall = mock(Schedulable.class);
@ -189,12 +193,14 @@ public class TestDecayRpcScheduler {
@Test @Test
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void testPriority() { public void testPriority() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush final String namespace = "ns";
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, conf.set(namespace + "." + DecayRpcScheduler
"25, 50, 75"); .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
scheduler = new DecayRpcScheduler(4, "ns", conf); conf.set(namespace + "." + DecayRpcScheduler
.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
scheduler = new DecayRpcScheduler(4, namespace, conf);
assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
@ -206,6 +212,20 @@ public class TestDecayRpcScheduler {
assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service="+ namespace + ",name=DecayRpcScheduler");
String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX of CallVolumeSummary before decay",
cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}"));
scheduler.forceDecay();
String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX for CallVolumeSummary after decay",
cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}"));
} }
@Test(timeout=2000) @Test(timeout=2000)