HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
d37266f63c
commit
effd1093b5
|
@ -27,27 +27,8 @@ String filter = "general";
|
|||
String format = "html";
|
||||
</%args>
|
||||
<%java>
|
||||
List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
|
||||
Iterator<? extends MonitoredTask> iter = tasks.iterator();
|
||||
// apply requested filter
|
||||
while (iter.hasNext()) {
|
||||
MonitoredTask t = iter.next();
|
||||
if (filter.equals("general")) {
|
||||
if (t instanceof MonitoredRPCHandler)
|
||||
iter.remove();
|
||||
} else if (filter.equals("handler")) {
|
||||
if (!(t instanceof MonitoredRPCHandler))
|
||||
iter.remove();
|
||||
} else if (filter.equals("rpc")) {
|
||||
if (!(t instanceof MonitoredRPCHandler) ||
|
||||
!((MonitoredRPCHandler) t).isRPCRunning())
|
||||
iter.remove();
|
||||
} else if (filter.equals("operation")) {
|
||||
if (!(t instanceof MonitoredRPCHandler) ||
|
||||
!((MonitoredRPCHandler) t).isOperationRunning())
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
List<? extends MonitoredTask> tasks = taskMonitor.getTasks(filter);
|
||||
long now = System.currentTimeMillis();
|
||||
Collections.reverse(tasks);
|
||||
boolean first = true;
|
||||
|
|
|
@ -157,22 +157,52 @@ public class TaskMonitor {
|
|||
* MonitoredTasks handled by this TaskMonitor.
|
||||
* @return A complete list of MonitoredTasks.
|
||||
*/
|
||||
public synchronized List<MonitoredTask> getTasks() {
|
||||
public List<MonitoredTask> getTasks() {
|
||||
return getTasks(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces a list containing copies of the current state of all non-expired
|
||||
* MonitoredTasks handled by this TaskMonitor.
|
||||
* @param filter type of wanted tasks
|
||||
* @return A filtered list of MonitoredTasks.
|
||||
*/
|
||||
public synchronized List<MonitoredTask> getTasks(String filter) {
|
||||
purgeExpiredTasks();
|
||||
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
|
||||
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
|
||||
it.hasNext();) {
|
||||
TaskAndWeakRefPair pair = it.next();
|
||||
MonitoredTask t = pair.get();
|
||||
ret.add(t.clone());
|
||||
TaskFilter taskFilter = createTaskFilter(filter);
|
||||
ArrayList<MonitoredTask> results =
|
||||
Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
|
||||
processTasks(tasks, taskFilter, results);
|
||||
processTasks(rpcTasks, taskFilter, results);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a task filter according to a given filter type.
|
||||
* @param filter type of monitored task
|
||||
* @return a task filter
|
||||
*/
|
||||
private static TaskFilter createTaskFilter(String filter) {
|
||||
switch (TaskFilter.TaskType.getTaskType(filter)) {
|
||||
case GENERAL: return task -> task instanceof MonitoredRPCHandler;
|
||||
case HANDLER: return task -> !(task instanceof MonitoredRPCHandler);
|
||||
case RPC: return task -> !(task instanceof MonitoredRPCHandler) ||
|
||||
!((MonitoredRPCHandler) task).isRPCRunning();
|
||||
case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) ||
|
||||
!((MonitoredRPCHandler) task).isOperationRunning();
|
||||
default: return task -> false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void processTasks(Iterable<TaskAndWeakRefPair> tasks,
|
||||
TaskFilter filter,
|
||||
List<MonitoredTask> results) {
|
||||
for (TaskAndWeakRefPair task : tasks) {
|
||||
MonitoredTask t = task.get();
|
||||
if (!filter.filter(t)) {
|
||||
results.add(t.clone());
|
||||
}
|
||||
for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
|
||||
it.hasNext();) {
|
||||
TaskAndWeakRefPair pair = it.next();
|
||||
MonitoredTask t = pair.get();
|
||||
ret.add(t.clone());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private boolean canPurge(MonitoredTask stat) {
|
||||
|
@ -280,4 +310,45 @@ public class TaskMonitor {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private interface TaskFilter {
|
||||
enum TaskType {
|
||||
GENERAL("general"),
|
||||
HANDLER("handler"),
|
||||
RPC("rpc"),
|
||||
OPERATION("operation"),
|
||||
ALL("all");
|
||||
|
||||
private String type;
|
||||
|
||||
private TaskType(String type) {
|
||||
this.type = type.toLowerCase();
|
||||
}
|
||||
|
||||
static TaskType getTaskType(String type) {
|
||||
if (type == null || type.isEmpty()) {
|
||||
return ALL;
|
||||
}
|
||||
type = type.toLowerCase();
|
||||
for (TaskType taskType : values()) {
|
||||
if (taskType.toString().equals(type)) {
|
||||
return taskType;
|
||||
}
|
||||
}
|
||||
return ALL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter out unwanted task.
|
||||
* @param task monitored task
|
||||
* @return false if a task is accepted, true if it is filtered
|
||||
*/
|
||||
boolean filter(MonitoredTask t);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Query;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -143,5 +149,47 @@ public class TestTaskMonitor {
|
|||
tm.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTasksWithFilter() throws Exception {
|
||||
TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
|
||||
// Create 5 general tasks
|
||||
tm.createStatus("General task1");
|
||||
tm.createStatus("General task2");
|
||||
tm.createStatus("General task3");
|
||||
tm.createStatus("General task4");
|
||||
tm.createStatus("General task5");
|
||||
// Create 5 rpc tasks, and mark 1 completed
|
||||
int length = 5;
|
||||
ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
|
||||
rpcHandlers.add(rpcHandler);
|
||||
}
|
||||
// Create rpc opertions
|
||||
byte[] row = new byte[] { 0x01 };
|
||||
Mutation m = new Put(row);
|
||||
Query q = new Scan();
|
||||
String notOperation = "for test";
|
||||
rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
|
||||
rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
|
||||
rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
|
||||
rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
|
||||
rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
|
||||
MonitoredRPCHandler completed = rpcHandlers.get(4);
|
||||
completed.markComplete("Completed!");
|
||||
// Test get tasks with filter
|
||||
List<MonitoredTask> generalTasks = tm.getTasks("general");
|
||||
assertEquals(5, generalTasks.size());
|
||||
List<MonitoredTask> handlerTasks = tm.getTasks("handler");
|
||||
assertEquals(5, handlerTasks.size());
|
||||
List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
|
||||
// The last rpc handler is stopped
|
||||
assertEquals(4, rpcTasks.size());
|
||||
List<MonitoredTask> operationTasks = tm.getTasks("operation");
|
||||
// Handler 3 doesn't handle Operation.
|
||||
assertEquals(3, operationTasks.size());
|
||||
tm.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue