HBASE-15871 Memstore flush doesn't finish because of backwardseek() in
memstore scanner. (Ram)
This commit is contained in:
parent
250ad644e4
commit
7d0a6a82ab
|
@ -131,6 +131,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Cell peek() {
|
public Cell peek() {
|
||||||
|
if (closed) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if (this.heap != null) {
|
if (this.heap != null) {
|
||||||
return this.heap.peek();
|
return this.heap.peek();
|
||||||
}
|
}
|
||||||
|
@ -143,6 +146,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Cell next() throws IOException {
|
public Cell next() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if(this.heap != null) {
|
if(this.heap != null) {
|
||||||
// loop over till the next suitable value
|
// loop over till the next suitable value
|
||||||
// take next value from the heap
|
// take next value from the heap
|
||||||
|
@ -167,6 +173,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean seek(Cell cell) throws IOException {
|
public boolean seek(Cell cell) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
initForwardKVHeapIfNeeded(comparator, scanners);
|
initForwardKVHeapIfNeeded(comparator, scanners);
|
||||||
|
|
||||||
if (cell == null) {
|
if (cell == null) {
|
||||||
|
@ -199,6 +208,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*
|
*
|
||||||
* TODO: The above comment copied from the original MemStoreScanner
|
* TODO: The above comment copied from the original MemStoreScanner
|
||||||
*/
|
*/
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
initForwardKVHeapIfNeeded(comparator, scanners);
|
initForwardKVHeapIfNeeded(comparator, scanners);
|
||||||
return heap.reseek(cell);
|
return heap.reseek(cell);
|
||||||
}
|
}
|
||||||
|
@ -241,6 +253,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
public boolean backwardSeek(Cell cell) throws IOException {
|
public boolean backwardSeek(Cell cell) throws IOException {
|
||||||
// The first time when this happens it sets the scanners to the seek key
|
// The first time when this happens it sets the scanners to the seek key
|
||||||
// passed by the incoming scan's start row
|
// passed by the incoming scan's start row
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
||||||
return heap.backwardSeek(cell);
|
return heap.backwardSeek(cell);
|
||||||
}
|
}
|
||||||
|
@ -253,6 +268,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToPreviousRow(Cell cell) throws IOException {
|
public boolean seekToPreviousRow(Cell cell) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
||||||
if (heap.peek() == null) {
|
if (heap.peek() == null) {
|
||||||
restartBackwardHeap(cell);
|
restartBackwardHeap(cell);
|
||||||
|
@ -262,6 +280,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToLastRow() throws IOException {
|
public boolean seekToLastRow() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
|
return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
// last iterated KVs by seek (to restore the iterator state after reseek)
|
// last iterated KVs by seek (to restore the iterator state after reseek)
|
||||||
private Cell last = null;
|
private Cell last = null;
|
||||||
|
|
||||||
|
// flag to indicate if this scanner is closed
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
protected SegmentScanner(Segment segment, long readPoint) {
|
protected SegmentScanner(Segment segment, long readPoint) {
|
||||||
this(segment, readPoint, DEFAULT_SCANNER_ORDER);
|
this(segment, readPoint, DEFAULT_SCANNER_ORDER);
|
||||||
}
|
}
|
||||||
|
@ -73,6 +76,10 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
// the initialization of the current is required for working with heap of SegmentScanners
|
// the initialization of the current is required for working with heap of SegmentScanners
|
||||||
current = getNext();
|
current = getNext();
|
||||||
this.scannerOrder = scannerOrder;
|
this.scannerOrder = scannerOrder;
|
||||||
|
if (current == null) {
|
||||||
|
// nothing to fetch from this scanner
|
||||||
|
close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -81,6 +88,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Cell peek() { // sanity check, the current should be always valid
|
public Cell peek() { // sanity check, the current should be always valid
|
||||||
|
if (closed) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if (current!=null && current.getSequenceId() > readPoint) {
|
if (current!=null && current.getSequenceId() > readPoint) {
|
||||||
throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
|
throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
|
||||||
"while current sequence id is " +current.getSequenceId());
|
"while current sequence id is " +current.getSequenceId());
|
||||||
|
@ -94,6 +104,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Cell next() throws IOException {
|
public Cell next() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
Cell oldCurrent = current;
|
Cell oldCurrent = current;
|
||||||
current = getNext(); // update the currently observed Cell
|
current = getNext(); // update the currently observed Cell
|
||||||
return oldCurrent;
|
return oldCurrent;
|
||||||
|
@ -106,6 +119,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean seek(Cell cell) throws IOException {
|
public boolean seek(Cell cell) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if(cell == null) {
|
if(cell == null) {
|
||||||
close();
|
close();
|
||||||
return false;
|
return false;
|
||||||
|
@ -129,7 +145,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean reseek(Cell cell) throws IOException {
|
public boolean reseek(Cell cell) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
|
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
|
||||||
This code is executed concurrently with flush and puts, without locks.
|
This code is executed concurrently with flush and puts, without locks.
|
||||||
|
@ -155,6 +173,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean backwardSeek(Cell key) throws IOException {
|
public boolean backwardSeek(Cell key) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
seek(key); // seek forward then go backward
|
seek(key); // seek forward then go backward
|
||||||
if (peek() == null || segment.compareRows(peek(), key) > 0) {
|
if (peek() == null || segment.compareRows(peek(), key) > 0) {
|
||||||
return seekToPreviousRow(key);
|
return seekToPreviousRow(key);
|
||||||
|
@ -172,6 +193,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToPreviousRow(Cell cell) throws IOException {
|
public boolean seekToPreviousRow(Cell cell) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
boolean keepSeeking;
|
boolean keepSeeking;
|
||||||
Cell key = cell;
|
Cell key = cell;
|
||||||
do {
|
do {
|
||||||
|
@ -205,6 +229,9 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToLastRow() throws IOException {
|
public boolean seekToLastRow() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
Cell higherCell = segment.isEmpty() ? null : segment.last();
|
Cell higherCell = segment.isEmpty() ? null : segment.last();
|
||||||
if (higherCell == null) {
|
if (higherCell == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -232,7 +259,11 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
getSegment().decScannerCount();
|
getSegment().decScannerCount();
|
||||||
|
closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -5901,6 +5901,64 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
|
||||||
|
byte[] cf1 = Bytes.toBytes("CF1");
|
||||||
|
byte[][] families = { cf1 };
|
||||||
|
byte[] col = Bytes.toBytes("C");
|
||||||
|
String method = this.getName();
|
||||||
|
HBaseConfiguration conf = new HBaseConfiguration();
|
||||||
|
this.region = initHRegion(tableName, method, conf, families);
|
||||||
|
try {
|
||||||
|
// setup with one storefile and one memstore, to create scanner and get an earlier readPt
|
||||||
|
Put put = new Put(Bytes.toBytes("19996"));
|
||||||
|
put.addColumn(cf1, col, Bytes.toBytes("val"));
|
||||||
|
region.put(put);
|
||||||
|
Put put2 = new Put(Bytes.toBytes("19995"));
|
||||||
|
put2.addColumn(cf1, col, Bytes.toBytes("val"));
|
||||||
|
region.put(put2);
|
||||||
|
// create a reverse scan
|
||||||
|
Scan scan = new Scan(Bytes.toBytes("19996"));
|
||||||
|
scan.setReversed(true);
|
||||||
|
RegionScanner scanner = region.getScanner(scan);
|
||||||
|
|
||||||
|
// flush the cache. This will reset the store scanner
|
||||||
|
region.flushcache(true, true);
|
||||||
|
|
||||||
|
// create one memstore contains many rows will be skipped
|
||||||
|
// to check MemStoreScanner.seekToPreviousRow
|
||||||
|
for (int i = 10000; i < 20000; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("" + i));
|
||||||
|
p.addColumn(cf1, col, Bytes.toBytes("" + i));
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
List<Cell> currRow = new ArrayList<>();
|
||||||
|
boolean hasNext;
|
||||||
|
boolean assertDone = false;
|
||||||
|
do {
|
||||||
|
hasNext = scanner.next(currRow);
|
||||||
|
// With HBASE-15871, after the scanner is reset the memstore scanner should not be
|
||||||
|
// added here
|
||||||
|
if (!assertDone) {
|
||||||
|
StoreScanner current =
|
||||||
|
(StoreScanner) (((RegionScannerImpl) scanner).storeHeap).getCurrentForTesting();
|
||||||
|
List<KeyValueScanner> scanners = current.getAllScannersForTesting();
|
||||||
|
assertEquals("There should be only one scanner the store file scanner", 1,
|
||||||
|
scanners.size());
|
||||||
|
assertDone = true;
|
||||||
|
}
|
||||||
|
} while (hasNext);
|
||||||
|
assertEquals(2, currRow.size());
|
||||||
|
assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
|
||||||
|
currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
|
||||||
|
assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
|
||||||
|
currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
|
||||||
|
} finally {
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
this.region = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitRegionWithReverseScan() throws IOException {
|
public void testSplitRegionWithReverseScan() throws IOException {
|
||||||
TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan");
|
TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan");
|
||||||
|
|
Loading…
Reference in New Issue