HBASE-2616 TestHRegion.testWritesWhileGetting flaky on trunk

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@952796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-06-08 20:11:50 +00:00
parent 63e72a9228
commit fc52e8d359
6 changed files with 90 additions and 57 deletions

View File

@ -377,6 +377,7 @@ Release 0.21.0 - Unreleased
HBASE-2614 killing server in TestMasterTransitions causes NPEs and test deadlock HBASE-2614 killing server in TestMasterTransitions causes NPEs and test deadlock
HBASE-2615 M/R on bulk imported tables HBASE-2615 M/R on bulk imported tables
HBASE-2676 TestInfoServers should use ephemeral ports HBASE-2676 TestInfoServers should use ephemeral ports
HBASE-2616 TestHRegion.testWritesWhileGetting flaky on trunk
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -982,13 +982,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
storeFlushers.add(s.getStoreFlusher(completeSequenceId)); storeFlushers.add(s.getStoreFlusher(completeSequenceId));
} }
// This thread is going to cause a whole bunch of scanners to reseek.
// They are depending
// on a thread-local to know where to read from.
// The reason why we set it up high is so that each HRegionScanner only
// has a single read point for all its sub-StoreScanners.
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
// prepare flush (take a snapshot) // prepare flush (take a snapshot)
for (StoreFlusher flusher : storeFlushers) { for (StoreFlusher flusher : storeFlushers) {
flusher.prepare(); flusher.prepare();
@ -1026,9 +1019,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
try { try {
// update this again to make sure we are 'fresh'
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
if (atomicWork != null) { if (atomicWork != null) {
atomicWork.call(); atomicWork.call();
} }
@ -1452,7 +1442,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @param family * @param family
* @param qualifier * @param qualifier
* @param expectedValue * @param expectedValue
* @param put
* @param lockId * @param lockId
* @param writeToWAL * @param writeToWAL
* @throws IOException * @throws IOException
@ -1949,14 +1938,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private final byte [] stopRow; private final byte [] stopRow;
private Filter filter; private Filter filter;
private List<KeyValue> results = new ArrayList<KeyValue>(); private List<KeyValue> results = new ArrayList<KeyValue>();
private int isScan;
private int batch; private int batch;
// Doesn't need to be volatile, always accessed under a sync'ed method private int isScan;
private boolean filterClosed = false; private boolean filterClosed = false;
private Scan theScan = null; private long readPt;
private List<KeyValueScanner> extraScanners = null;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) { RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>"); //DebugPrint.println("HRegionScanner.<init>");
this.filter = scan.getFilter(); this.filter = scan.getFilter();
this.batch = scan.getBatch(); this.batch = scan.getBatch();
@ -1968,35 +1955,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// If we are doing a get, we want to be [startRow,endRow] normally // If we are doing a get, we want to be [startRow,endRow] normally
// it is [startRow,endRow) and if startRow=endRow we get nothing. // it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0; this.isScan = scan.isGetScan() ? -1 : 0;
this.theScan = scan;
this.extraScanners = additionalScanners;
}
RegionScanner(Scan scan) { this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
this(scan, null);
}
void initHeap() throws IOException {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (extraScanners != null) { if (additionalScanners != null) {
scanners.addAll(extraScanners); scanners.addAll(additionalScanners);
} }
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
theScan.getFamilyMap().entrySet()) { scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey()); Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(theScan, entry.getValue())); scanners.add(store.getScanner(scan, entry.getValue()));
} }
this.storeHeap = new KeyValueHeap(scanners, comparator); this.storeHeap = new KeyValueHeap(scanners, comparator);
} }
private void resetFilters() { RegionScanner(Scan scan) throws IOException {
this(scan, null);
}
/**
* Reset both the filter and the old filter.
*/
protected void resetFilters() {
if (filter != null) { if (filter != null) {
filter.reset(); filter.reset();
} }
// Start the next row read and reset the thread point
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
} }
public synchronized boolean next(List<KeyValue> outResults, int limit) public synchronized boolean next(List<KeyValue> outResults, int limit)
@ -2013,11 +1998,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
// This could be a new thread from the last time we called next(). // This could be a new thread from the last time we called next().
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
// lazy init the store heap.
if (storeHeap == null) {
initHeap();
}
results.clear(); results.clear();
boolean returnResult = nextInternal(limit); boolean returnResult = nextInternal(limit);

View File

@ -23,6 +23,10 @@ public class ReadWriteConsistencyControl {
return perThreadReadPoint.get(); return perThreadReadPoint.get();
} }
public static void setThreadReadPoint(long readPoint) {
perThreadReadPoint.set(readPoint);
}
public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
perThreadReadPoint.set(rwcc.memstoreReadPoint()); perThreadReadPoint.set(rwcc.memstoreReadPoint());
return getThreadReadPoint(); return getThreadReadPoint();

View File

@ -1064,11 +1064,8 @@ public class Store implements HConstants, HeapSize {
if (result != null) { if (result != null) {
newStoreFiles.add(result); newStoreFiles.add(result);
} }
this.storefiles = ImmutableList.copyOf(newStoreFiles);
// WARN ugly hack here, but necessary sadly. this.storefiles = ImmutableList.copyOf(newStoreFiles);
// TODO why is this necessary? need a comment here if it's unintuitive!
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
// Tell observers that list of StoreFiles has changed. // Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers(); notifyChangedReadersObservers();

View File

@ -42,9 +42,13 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
private boolean cacheBlocks; private boolean cacheBlocks;
// Used to indicate that the scanner has closed (see HBASE-1107) // Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
private boolean closing = false; private boolean closing = false;
private final boolean isGet; private final boolean isGet;
// if heap == null and lastTop != null, you need to reseek given the key below
private KeyValue lastTop = null;
/** /**
* Opens a scanner across memstore, snapshot, and all StoreFiles. * Opens a scanner across memstore, snapshot, and all StoreFiles.
* *
@ -165,6 +169,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
} }
public synchronized KeyValue peek() { public synchronized KeyValue peek() {
try {
checkReseek();
} catch (IOException e) {
throw new RuntimeException("IOE conversion", e);
}
if (this.heap == null) {
return null;
}
return this.heap.peek(); return this.heap.peek();
} }
@ -179,10 +192,18 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// under test, we dont have a this.store // under test, we dont have a this.store
if (this.store != null) if (this.store != null)
this.store.deleteChangedReaderObserver(this); this.store.deleteChangedReaderObserver(this);
if (this.heap != null)
this.heap.close(); this.heap.close();
} }
public synchronized boolean seek(KeyValue key) throws IOException { public synchronized boolean seek(KeyValue key) throws IOException {
if (this.heap == null) {
List<KeyValueScanner> scanners = getScanners();
heap = new KeyValueHeap(scanners, store.comparator);
}
return this.heap.seek(key); return this.heap.seek(key);
} }
@ -194,11 +215,22 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
*/ */
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException { public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
//DebugPrint.println("SS.next"); //DebugPrint.println("SS.next");
checkReseek();
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
close();
return false;
}
KeyValue peeked = this.heap.peek(); KeyValue peeked = this.heap.peek();
if (peeked == null) { if (peeked == null) {
close(); close();
return false; return false;
} }
matcher.setRow(peeked.getRow()); matcher.setRow(peeked.getRow());
KeyValue kv; KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>(); List<KeyValue> results = new ArrayList<KeyValue>();
@ -265,17 +297,37 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// Implementation of ChangedReadersObserver // Implementation of ChangedReadersObserver
public synchronized void updateReaders() throws IOException { public synchronized void updateReaders() throws IOException {
if (this.closing) return; if (this.closing) return;
KeyValue topKey = this.peek();
if (topKey == null) return;
List<KeyValueScanner> scanners = getScanners(); // this could be null.
this.lastTop = this.peek();
// close the previous scanners: //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files
this.heap.close(); // bubble thru and close all scanners. this.heap.close(); // bubble thru and close all scanners.
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking
}
private void checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
reseek(this.lastTop);
this.lastTop = null; // gone!
}
// else dont need to reseek
}
private void reseek(KeyValue lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
List<KeyValueScanner> scanners = getScanners();
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {
scanner.seek(topKey); scanner.seek(lastTopKey);
} }
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
@ -284,6 +336,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// Reset the state of the Query Matcher and set to top row // Reset the state of the Query Matcher and set to top row
matcher.reset(); matcher.reset();
KeyValue kv = heap.peek(); KeyValue kv = heap.peek();
matcher.setRow((kv == null ? topKey : kv).getRow()); matcher.setRow((kv == null ? lastTopKey : kv).getRow());
} }
} }

View File

@ -1231,12 +1231,10 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam2); scan.addFamily(fam2);
scan.addFamily(fam4); scan.addFamily(fam4);
is = (RegionScanner) region.getScanner(scan); is = (RegionScanner) region.getScanner(scan);
is.initHeap(); // i dont like this test
assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size()); assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size());
scan = new Scan(); scan = new Scan();
is = (RegionScanner) region.getScanner(scan); is = (RegionScanner) region.getScanner(scan);
is.initHeap();
assertEquals(families.length -1, assertEquals(families.length -1,
((RegionScanner)is).storeHeap.getHeap().size()); ((RegionScanner)is).storeHeap.getHeap().size());
} }