HBASE-18248 Warn if monitored RPC task has been tied up beyond a configurable threshold
This commit is contained in:
parent
f855b51650
commit
a902175553
|
@ -251,6 +251,12 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
if (getState() != State.RUNNING) {
|
||||
return super.toString();
|
||||
}
|
||||
return super.toString() + ", rpcMethod=" + getRPC();
|
||||
return super.toString()
|
||||
+ ", queuetimems=" + getRPCQueueTime()
|
||||
+ ", starttimems=" + getRPCStartTime()
|
||||
+ ", clientaddress=" + clientAddress
|
||||
+ ", remoteport=" + remotePort
|
||||
+ ", packetlength=" + getRPCPacketLength()
|
||||
+ ", rpcMethod=" + getRPC();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ public interface MonitoredTask extends Cloneable {
|
|||
State getState();
|
||||
long getStateTime();
|
||||
long getCompletionTimestamp();
|
||||
long getWarnTime();
|
||||
|
||||
void markComplete(String msg);
|
||||
void pause(String msg);
|
||||
|
@ -48,6 +49,7 @@ public interface MonitoredTask extends Cloneable {
|
|||
|
||||
void setStatus(String status);
|
||||
void setDescription(String description);
|
||||
void setWarnTime(final long t);
|
||||
|
||||
/**
|
||||
* Explicitly mark this status as able to be cleaned up,
|
||||
|
|
|
@ -30,7 +30,8 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
private long startTime;
|
||||
private long statusTime;
|
||||
private long stateTime;
|
||||
|
||||
private long warnTime;
|
||||
|
||||
private volatile String status;
|
||||
private volatile String description;
|
||||
|
||||
|
@ -42,6 +43,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
startTime = System.currentTimeMillis();
|
||||
statusTime = startTime;
|
||||
stateTime = startTime;
|
||||
warnTime = startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +84,12 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
public long getStateTime() {
|
||||
return stateTime;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getWarnTime() {
|
||||
return warnTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompletionTimestamp() {
|
||||
if (state == State.COMPLETE || state == State.ABORTED) {
|
||||
|
@ -131,6 +138,11 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWarnTime(long t) {
|
||||
this.warnTime = t;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
if (state == State.RUNNING) {
|
||||
|
|
|
@ -30,9 +30,12 @@ import java.util.List;
|
|||
import org.apache.commons.collections.buffer.CircularFifoBuffer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
|
@ -44,16 +47,35 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
|||
public class TaskMonitor {
|
||||
private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
|
||||
|
||||
// Don't keep around any tasks that have completed more than
|
||||
// 60 seconds ago
|
||||
private static final long EXPIRATION_TIME = 60*1000;
|
||||
public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
|
||||
public static final int DEFAULT_MAX_TASKS = 1000;
|
||||
public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
|
||||
public static final long DEFAULT_RPC_WARN_TIME = 0;
|
||||
public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
|
||||
public static final long DEFAULT_EXPIRATION_TIME = 60*1000;
|
||||
public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
|
||||
public static final long DEFAULT_MONITOR_INTERVAL = 10*1000;
|
||||
|
||||
@VisibleForTesting
|
||||
static final int MAX_TASKS = 1000;
|
||||
|
||||
private static TaskMonitor instance;
|
||||
private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
|
||||
private List<TaskAndWeakRefPair> rpcTasks = Lists.newArrayList();
|
||||
|
||||
private final int maxTasks;
|
||||
private final long rpcWarnTime;
|
||||
private final long expirationTime;
|
||||
private final CircularFifoBuffer tasks;
|
||||
private final List<TaskAndWeakRefPair> rpcTasks;
|
||||
private final long monitorInterval;
|
||||
private Thread monitorThread;
|
||||
|
||||
TaskMonitor(Configuration conf) {
|
||||
maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
|
||||
expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
|
||||
rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
|
||||
tasks = new CircularFifoBuffer(maxTasks);
|
||||
rpcTasks = Lists.newArrayList();
|
||||
monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
|
||||
monitorThread = new Thread(new MonitorRunnable());
|
||||
Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get singleton instance.
|
||||
|
@ -61,7 +83,7 @@ public class TaskMonitor {
|
|||
*/
|
||||
public static synchronized TaskMonitor get() {
|
||||
if (instance == null) {
|
||||
instance = new TaskMonitor();
|
||||
instance = new TaskMonitor(HBaseConfiguration.create());
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
@ -93,6 +115,22 @@ public class TaskMonitor {
|
|||
return proxy;
|
||||
}
|
||||
|
||||
private synchronized void warnStuckTasks() {
|
||||
if (rpcWarnTime > 0) {
|
||||
final long now = EnvironmentEdgeManager.currentTime();
|
||||
for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
|
||||
it.hasNext();) {
|
||||
TaskAndWeakRefPair pair = it.next();
|
||||
MonitoredTask stat = pair.get();
|
||||
if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) &&
|
||||
(now >= stat.getWarnTime() + rpcWarnTime)) {
|
||||
LOG.warn("Task may be stuck: " + stat);
|
||||
stat.setWarnTime(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void purgeExpiredTasks() {
|
||||
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
|
||||
it.hasNext();) {
|
||||
|
@ -139,12 +177,11 @@ public class TaskMonitor {
|
|||
|
||||
private boolean canPurge(MonitoredTask stat) {
|
||||
long cts = stat.getCompletionTimestamp();
|
||||
return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
|
||||
return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
|
||||
}
|
||||
|
||||
|
||||
public void dumpAsText(PrintWriter out) {
|
||||
long now = System.currentTimeMillis();
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
List<MonitoredTask> tasks = getTasks();
|
||||
for (MonitoredTask task : tasks) {
|
||||
|
@ -164,6 +201,12 @@ public class TaskMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void shutdown() {
|
||||
if (this.monitorThread != null) {
|
||||
monitorThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class encapsulates an object as well as a weak reference to a proxy
|
||||
* that passes through calls to that object. In art form:
|
||||
|
@ -218,4 +261,23 @@ public class TaskMonitor {
|
|||
return method.invoke(delegatee, args);
|
||||
}
|
||||
}
|
||||
|
||||
private class MonitorRunnable implements Runnable {
|
||||
private boolean running = true;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (running) {
|
||||
try {
|
||||
Thread.sleep(monitorInterval);
|
||||
if (tasks.isFull()) {
|
||||
purgeExpiredTasks();
|
||||
}
|
||||
warnStuckTasks();
|
||||
} catch (InterruptedException e) {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,8 +22,10 @@ import static org.junit.Assert.*;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -32,7 +34,7 @@ public class TestTaskMonitor {
|
|||
|
||||
@Test
|
||||
public void testTaskMonitorBasics() {
|
||||
TaskMonitor tm = new TaskMonitor();
|
||||
TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
assertTrue("Task monitor should start empty",
|
||||
tm.getTasks().isEmpty());
|
||||
|
||||
|
@ -55,11 +57,13 @@ public class TestTaskMonitor {
|
|||
// If we mark its completion time back a few minutes, it should get gced
|
||||
task.expireNow();
|
||||
assertEquals(0, tm.getTasks().size());
|
||||
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTasksGetAbortedOnLeak() throws InterruptedException {
|
||||
final TaskMonitor tm = new TaskMonitor();
|
||||
final TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
assertTrue("Task monitor should start empty",
|
||||
tm.getTasks().isEmpty());
|
||||
|
||||
|
@ -86,42 +90,58 @@ public class TestTaskMonitor {
|
|||
// Now it should be aborted
|
||||
MonitoredTask taskFromTm = tm.getTasks().get(0);
|
||||
assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
|
||||
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskLimit() throws Exception {
|
||||
TaskMonitor tm = new TaskMonitor();
|
||||
for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
|
||||
TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
|
||||
tm.createStatus("task " + i);
|
||||
}
|
||||
// Make sure it was limited correctly
|
||||
assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
|
||||
assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
|
||||
// Make sure we culled the earlier tasks, not later
|
||||
// (i.e. tasks 0 through 9 should have been deleted)
|
||||
assertEquals("task 10", tm.getTasks().get(0).getDescription());
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoNotPurgeRPCTask() throws Exception {
|
||||
int RPCTaskNums = 10;
|
||||
TaskMonitor tm = TaskMonitor.get();
|
||||
for(int i = 0; i < RPCTaskNums; i++) {
|
||||
TaskMonitor.get().createRPCStatus("PRCTask" + i);
|
||||
tm.createRPCStatus("PRCTask" + i);
|
||||
}
|
||||
for(int i = 0; i < TaskMonitor.MAX_TASKS; i++) {
|
||||
TaskMonitor.get().createStatus("otherTask" + i);
|
||||
for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
|
||||
tm.createStatus("otherTask" + i);
|
||||
}
|
||||
int remainRPCTask = 0;
|
||||
for(MonitoredTask task :TaskMonitor.get().getTasks()) {
|
||||
for(MonitoredTask task: tm.getTasks()) {
|
||||
if(task instanceof MonitoredRPCHandler) {
|
||||
remainRPCTask++;
|
||||
}
|
||||
}
|
||||
assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
|
||||
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testWarnStuckTasks() throws Exception {
|
||||
final int INTERVAL = 1000;
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, INTERVAL);
|
||||
conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, INTERVAL);
|
||||
final TaskMonitor tm = new TaskMonitor(conf);
|
||||
MonitoredRPCHandler t = tm.createRPCStatus("test task");
|
||||
long then = EnvironmentEdgeManager.currentTime();
|
||||
t.setRPC("testMethod", new Object[0], then);
|
||||
Thread.sleep(INTERVAL * 2);
|
||||
assertTrue("We did not warn", t.getWarnTime() > then);
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue