diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index 4e75eb60052..949b036401a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -53,6 +53,7 @@ public class TaskMonitor { private static TaskMonitor instance; private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS); + private List rpcTasks = Lists.newArrayList(); /** * Get singleton instance. @@ -88,7 +89,7 @@ public class TaskMonitor { new Class[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler(stat)); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); - tasks.add(pair); + rpcTasks.add(pair); return proxy; } @@ -120,13 +121,19 @@ public class TaskMonitor { */ public synchronized List getTasks() { purgeExpiredTasks(); - ArrayList ret = Lists.newArrayListWithCapacity(tasks.size()); + ArrayList ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); for (Iterator it = tasks.iterator(); it.hasNext();) { TaskAndWeakRefPair pair = it.next(); MonitoredTask t = pair.get(); ret.add(t.clone()); } + for (Iterator it = rpcTasks.iterator(); + it.hasNext();) { + TaskAndWeakRefPair pair = it.next(); + MonitoredTask t = pair.get(); + ret.add(t.clone()); + } return ret; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index e54d0f6f0d2..5464d9fd7a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -101,6 +101,27 @@ public class TestTaskMonitor { assertEquals("task 10", tm.getTasks().get(0).getDescription()); } + @Test + public void testDoNotPurgeRPCTask() throws Exception { + int RPCTaskNums = 10; + for(int i = 0; i < RPCTaskNums; i++) { + TaskMonitor.get().createRPCStatus("PRCTask" + i); + } + for(int i = 0; i < TaskMonitor.MAX_TASKS; i++) { + TaskMonitor.get().createStatus("otherTask" + i); + } + int remainRPCTask = 0; + for(MonitoredTask task :TaskMonitor.get().getTasks()) { + if(task instanceof MonitoredRPCHandler) { + remainRPCTask++; + } + } + assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); + + } + + + }