HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Reid Chan 2017-08-15 15:50:22 +08:00 committed by tedyu
parent 5073bd6e04
commit f1376213ac
3 changed files with 133 additions and 33 deletions

View File

@ -27,27 +27,8 @@ String filter = "general";
String format = "html"; String format = "html";
</%args> </%args>
<%java> <%java>
List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
Iterator<? extends MonitoredTask> iter = tasks.iterator();
// apply requested filter // apply requested filter
while (iter.hasNext()) { List<? extends MonitoredTask> tasks = taskMonitor.getTasks(filter);
MonitoredTask t = iter.next();
if (filter.equals("general")) {
if (t instanceof MonitoredRPCHandler)
iter.remove();
} else if (filter.equals("handler")) {
if (!(t instanceof MonitoredRPCHandler))
iter.remove();
} else if (filter.equals("rpc")) {
if (!(t instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) t).isRPCRunning())
iter.remove();
} else if (filter.equals("operation")) {
if (!(t instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) t).isOperationRunning())
iter.remove();
}
}
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
Collections.reverse(tasks); Collections.reverse(tasks);
boolean first = true; boolean first = true;

View File

@ -157,22 +157,52 @@ public class TaskMonitor {
* MonitoredTasks handled by this TaskMonitor. * MonitoredTasks handled by this TaskMonitor.
* @return A complete list of MonitoredTasks. * @return A complete list of MonitoredTasks.
*/ */
public synchronized List<MonitoredTask> getTasks() { public List<MonitoredTask> getTasks() {
return getTasks(null);
}
/**
* Produces a list containing copies of the current state of all non-expired
* MonitoredTasks handled by this TaskMonitor.
* @param filter type of wanted tasks
* @return A filtered list of MonitoredTasks.
*/
public synchronized List<MonitoredTask> getTasks(String filter) {
purgeExpiredTasks(); purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); TaskFilter taskFilter = createTaskFilter(filter);
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); ArrayList<MonitoredTask> results =
it.hasNext();) { Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
TaskAndWeakRefPair pair = it.next(); processTasks(tasks, taskFilter, results);
MonitoredTask t = pair.get(); processTasks(rpcTasks, taskFilter, results);
ret.add(t.clone()); return results;
}
/**
* Create a task filter according to a given filter type.
* @param filter type of monitored task
* @return a task filter
*/
private static TaskFilter createTaskFilter(String filter) {
switch (TaskFilter.TaskType.getTaskType(filter)) {
case GENERAL: return task -> task instanceof MonitoredRPCHandler;
case HANDLER: return task -> !(task instanceof MonitoredRPCHandler);
case RPC: return task -> !(task instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) task).isRPCRunning();
case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) task).isOperationRunning();
default: return task -> false;
}
}
private static void processTasks(Iterable<TaskAndWeakRefPair> tasks,
TaskFilter filter,
List<MonitoredTask> results) {
for (TaskAndWeakRefPair task : tasks) {
MonitoredTask t = task.get();
if (!filter.filter(t)) {
results.add(t.clone());
} }
for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
it.hasNext();) {
TaskAndWeakRefPair pair = it.next();
MonitoredTask t = pair.get();
ret.add(t.clone());
} }
return ret;
} }
private boolean canPurge(MonitoredTask stat) { private boolean canPurge(MonitoredTask stat) {
@ -280,4 +310,45 @@ public class TaskMonitor {
} }
} }
} }
private interface TaskFilter {
enum TaskType {
GENERAL("general"),
HANDLER("handler"),
RPC("rpc"),
OPERATION("operation"),
ALL("all");
private String type;
private TaskType(String type) {
this.type = type.toLowerCase();
}
static TaskType getTaskType(String type) {
if (type == null || type.isEmpty()) {
return ALL;
}
type = type.toLowerCase();
for (TaskType taskType : values()) {
if (taskType.toString().equals(type)) {
return taskType;
}
}
return ALL;
}
@Override
public String toString() {
return type;
}
}
/**
* Filter out unwanted task.
* @param task monitored task
* @return false if a task is accepted, true if it is filtered
*/
boolean filter(MonitoredTask t);
}
} }

View File

@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -143,5 +149,47 @@ public class TestTaskMonitor {
tm.shutdown(); tm.shutdown();
} }
@Test
public void testGetTasksWithFilter() throws Exception {
TaskMonitor tm = new TaskMonitor(new Configuration());
assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
// Create 5 general tasks
tm.createStatus("General task1");
tm.createStatus("General task2");
tm.createStatus("General task3");
tm.createStatus("General task4");
tm.createStatus("General task5");
// Create 5 rpc tasks, and mark 1 completed
int length = 5;
ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
rpcHandlers.add(rpcHandler);
}
// Create rpc opertions
byte[] row = new byte[] { 0x01 };
Mutation m = new Put(row);
Query q = new Scan();
String notOperation = "for test";
rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
MonitoredRPCHandler completed = rpcHandlers.get(4);
completed.markComplete("Completed!");
// Test get tasks with filter
List<MonitoredTask> generalTasks = tm.getTasks("general");
assertEquals(5, generalTasks.size());
List<MonitoredTask> handlerTasks = tm.getTasks("handler");
assertEquals(5, handlerTasks.size());
List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
// The last rpc handler is stopped
assertEquals(4, rpcTasks.size());
List<MonitoredTask> operationTasks = tm.getTasks("operation");
// Handler 3 doesn't handle Operation.
assertEquals(3, operationTasks.size());
tm.shutdown();
}
} }