HBASE-10312 Flooding the cluster with administrative actions leads to collapse

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1587742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2014-04-15 22:46:47 +00:00
parent 9f874d4d09
commit 9fcb2ae865
1 changed files with 5 additions and 18 deletions

View File

@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -51,8 +52,7 @@ public class TaskMonitor {
static final int MAX_TASKS = 1000; static final int MAX_TASKS = 1000;
private static TaskMonitor instance; private static TaskMonitor instance;
private List<TaskAndWeakRefPair> tasks = private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
Lists.newArrayList();
/** /**
* Get singleton instance. * Get singleton instance.
@ -74,9 +74,6 @@ public class TaskMonitor {
new PassthroughInvocationHandler<MonitoredTask>(stat)); new PassthroughInvocationHandler<MonitoredTask>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair); tasks.add(pair);
if (tasks.size() > MAX_TASKS) {
purgeExpiredTasks();
}
return proxy; return proxy;
} }
@ -89,15 +86,10 @@ public class TaskMonitor {
new PassthroughInvocationHandler<MonitoredRPCHandler>(stat)); new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair); tasks.add(pair);
if (tasks.size() > MAX_TASKS) {
purgeExpiredTasks();
}
return proxy; return proxy;
} }
private synchronized void purgeExpiredTasks() { private synchronized void purgeExpiredTasks() {
int size = 0;
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
it.hasNext();) { it.hasNext();) {
TaskAndWeakRefPair pair = it.next(); TaskAndWeakRefPair pair = it.next();
@ -114,15 +106,8 @@ public class TaskMonitor {
if (canPurge(stat)) { if (canPurge(stat)) {
it.remove(); it.remove();
} else {
size++;
} }
} }
if (size > MAX_TASKS) {
LOG.warn("Too many actions in action monitor! Purging some.");
tasks = tasks.subList(size - MAX_TASKS, size);
}
} }
/** /**
@ -133,7 +118,9 @@ public class TaskMonitor {
public synchronized List<MonitoredTask> getTasks() { public synchronized List<MonitoredTask> getTasks() {
purgeExpiredTasks(); purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size()); ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
for (TaskAndWeakRefPair pair : tasks) { for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
it.hasNext();) {
TaskAndWeakRefPair pair = it.next();
MonitoredTask t = pair.get(); MonitoredTask t = pair.get();
ret.add(t.clone()); ret.add(t.clone());
} }