HBASE-2774 Spin in ReadWriteConsistencyControl eating CPU (load > 40) and no progress running YCSB on clean cluster startup

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@957411 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-06-24 04:35:22 +00:00
parent e917af75fc
commit 878f8e5fa3
2 changed files with 28 additions and 14 deletions

View File

@ -417,6 +417,8 @@ Release 0.21.0 - Unreleased
average - slop; endless churn average - slop; endless churn
HBASE-2729 Interrupted or failed memstore flushes should not corrupt the HBASE-2729 Interrupted or failed memstore flushes should not corrupt the
region region
HBASE-2774 Spin in ReadWriteConsistencyControl eating CPU (load > 40) and
no progress running YCSB on clean cluster startup
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -10,8 +10,11 @@ import java.util.concurrent.atomic.AtomicLong;
* the new writes for readers to read (thus forming atomic transactions). * the new writes for readers to read (thus forming atomic transactions).
*/ */
public class ReadWriteConsistencyControl { public class ReadWriteConsistencyControl {
private final AtomicLong memstoreRead = new AtomicLong(); private volatile long memstoreRead = 0;
private final AtomicLong memstoreWrite = new AtomicLong(); private volatile long memstoreWrite = 0;
private final Object readWaiters = new Object();
// This is the pending queue of writes. // This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue = private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>(); new LinkedList<WriteEntry>();
@ -34,12 +37,13 @@ public class ReadWriteConsistencyControl {
public WriteEntry beginMemstoreInsert() { public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) { synchronized (writeQueue) {
long nextWriteNumber = memstoreWrite.incrementAndGet(); long nextWriteNumber = ++memstoreWrite;
WriteEntry e = new WriteEntry(nextWriteNumber); WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e); writeQueue.add(e);
return e; return e;
} }
} }
public void completeMemstoreInsert(WriteEntry e) { public void completeMemstoreInsert(WriteEntry e) {
synchronized (writeQueue) { synchronized (writeQueue) {
e.markCompleted(); e.markCompleted();
@ -70,24 +74,32 @@ public class ReadWriteConsistencyControl {
} }
if (nextReadValue > 0) { if (nextReadValue > 0) {
memstoreRead.set(nextReadValue); memstoreRead = nextReadValue;
synchronized (readWaiters) {
readWaiters.notifyAll();
}
} }
} }
// Spin until any other concurrent puts have finished. This makes sure that boolean interrupted = false;
// if we move on to construct a scanner, we'll get read-your-own-writes while (memstoreRead < e.getWriteNumber()) {
// consistency. We anticipate that since puts to the memstore are very fast, synchronized (readWaiters) {
// this will be on the order of microseconds - so spinning should be faster try {
// than a condition variable. readWaiters.wait(0);
int spun = 0; } catch (InterruptedException e) {
while (memstoreRead.get() < e.getWriteNumber()) { // We were interrupted... finish the loop -- i.e. cleanup --and then
spun++; // on our way out, reset the interrupt flag.
interrupted = true;
}
}
} }
// Could potentially expose spun as a metric if (interrupted) Thread.currentThread.interrupt();
} }
public long memstoreReadPoint() { public long memstoreReadPoint() {
return memstoreRead.get(); return memstoreRead;
} }