From effd1093b559aeba2bf66a4cf81cd4a0013de184 Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Tue, 15 Aug 2017 15:50:22 +0800 Subject: [PATCH] HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection Signed-off-by: tedyu --- .../hbase/tmpl/common/TaskMonitorTmpl.jamon | 21 +--- .../hadoop/hbase/monitoring/TaskMonitor.java | 97 ++++++++++++++++--- .../hbase/monitoring/TestTaskMonitor.java | 48 +++++++++ 3 files changed, 133 insertions(+), 33 deletions(-) diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon index b4a5feae456..986bc3a4555 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon @@ -27,27 +27,8 @@ String filter = "general"; String format = "html"; <%java> -List tasks = taskMonitor.getTasks(); -Iterator iter = tasks.iterator(); // apply requested filter -while (iter.hasNext()) { - MonitoredTask t = iter.next(); - if (filter.equals("general")) { - if (t instanceof MonitoredRPCHandler) - iter.remove(); - } else if (filter.equals("handler")) { - if (!(t instanceof MonitoredRPCHandler)) - iter.remove(); - } else if (filter.equals("rpc")) { - if (!(t instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) t).isRPCRunning()) - iter.remove(); - } else if (filter.equals("operation")) { - if (!(t instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) t).isOperationRunning()) - iter.remove(); - } -} +List tasks = taskMonitor.getTasks(filter); long now = System.currentTimeMillis(); Collections.reverse(tasks); boolean first = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index 780916f9698..ad9bd0225ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -157,22 +157,52 @@ public class TaskMonitor { * MonitoredTasks handled by this TaskMonitor. * @return A complete list of MonitoredTasks. */ - public synchronized List getTasks() { + public List getTasks() { + return getTasks(null); + } + + /** + * Produces a list containing copies of the current state of all non-expired + * MonitoredTasks handled by this TaskMonitor. + * @param filter type of wanted tasks + * @return A filtered list of MonitoredTasks. + */ + public synchronized List getTasks(String filter) { purgeExpiredTasks(); - ArrayList ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); - for (Iterator it = tasks.iterator(); - it.hasNext();) { - TaskAndWeakRefPair pair = it.next(); - MonitoredTask t = pair.get(); - ret.add(t.clone()); + TaskFilter taskFilter = createTaskFilter(filter); + ArrayList results = + Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); + processTasks(tasks, taskFilter, results); + processTasks(rpcTasks, taskFilter, results); + return results; + } + + /** + * Create a task filter according to a given filter type. + * @param filter type of monitored task + * @return a task filter + */ + private static TaskFilter createTaskFilter(String filter) { + switch (TaskFilter.TaskType.getTaskType(filter)) { + case GENERAL: return task -> task instanceof MonitoredRPCHandler; + case HANDLER: return task -> !(task instanceof MonitoredRPCHandler); + case RPC: return task -> !(task instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) task).isRPCRunning(); + case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) task).isOperationRunning(); + default: return task -> false; } - for (Iterator it = rpcTasks.iterator(); - it.hasNext();) { - TaskAndWeakRefPair pair = it.next(); - MonitoredTask t = pair.get(); - ret.add(t.clone()); + } + + private static void processTasks(Iterable tasks, + TaskFilter filter, + List results) { + for (TaskAndWeakRefPair task : tasks) { + MonitoredTask t = task.get(); + if (!filter.filter(t)) { + results.add(t.clone()); + } } - return ret; } private boolean canPurge(MonitoredTask stat) { @@ -280,4 +310,45 @@ public class TaskMonitor { } } } + + private interface TaskFilter { + enum TaskType { + GENERAL("general"), + HANDLER("handler"), + RPC("rpc"), + OPERATION("operation"), + ALL("all"); + + private String type; + + private TaskType(String type) { + this.type = type.toLowerCase(); + } + + static TaskType getTaskType(String type) { + if (type == null || type.isEmpty()) { + return ALL; + } + type = type.toLowerCase(); + for (TaskType taskType : values()) { + if (taskType.toString().equals(type)) { + return taskType; + } + } + return ALL; + } + + @Override + public String toString() { + return type; + } + } + + /** + * Filter out unwanted task. + * @param task monitored task + * @return false if a task is accepted, true if it is filtered + */ + boolean filter(MonitoredTask t); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index 718339afb22..7abcde8f9c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring; import static org.junit.Assert.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Query; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -143,5 +149,47 @@ public class TestTaskMonitor { tm.shutdown(); } + @Test + public void testGetTasksWithFilter() throws Exception { + TaskMonitor tm = new TaskMonitor(new Configuration()); + assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); + // Create 5 general tasks + tm.createStatus("General task1"); + tm.createStatus("General task2"); + tm.createStatus("General task3"); + tm.createStatus("General task4"); + tm.createStatus("General task5"); + // Create 5 rpc tasks, and mark 1 completed + int length = 5; + ArrayList rpcHandlers = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); + rpcHandlers.add(rpcHandler); + } + // Create rpc opertions + byte[] row = new byte[] { 0x01 }; + Mutation m = new Put(row); + Query q = new Scan(); + String notOperation = "for test"; + rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000); + rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000); + MonitoredRPCHandler completed = rpcHandlers.get(4); + completed.markComplete("Completed!"); + // Test get tasks with filter + List generalTasks = tm.getTasks("general"); + assertEquals(5, generalTasks.size()); + List handlerTasks = tm.getTasks("handler"); + assertEquals(5, handlerTasks.size()); + List rpcTasks = tm.getTasks("rpc"); + // The last rpc handler is stopped + assertEquals(4, rpcTasks.size()); + List operationTasks = tm.getTasks("operation"); + // Handler 3 doesn't handle Operation. + assertEquals(3, operationTasks.size()); + tm.shutdown(); + } }