HBASE-18295 The result contains the cells across different rows
This commit is contained in:
parent
44651e52d8
commit
bec34ae432
|
@ -142,6 +142,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
private ReentrantLock flushLock = new ReentrantLock();
|
||||
|
||||
private final long readPt;
|
||||
private boolean topChanged = false;
|
||||
|
||||
// used by the injection framework to test race between StoreScanner construction and compaction
|
||||
enum StoreScannerCompactionRace {
|
||||
|
@ -547,7 +548,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
|
||||
checkScanOrder(prevCell, cell, comparator);
|
||||
prevCell = cell;
|
||||
|
||||
topChanged = false;
|
||||
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
|
||||
switch (qcode) {
|
||||
case INCLUDE:
|
||||
|
@ -644,10 +645,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
// another compareRow to say the current row is DONE
|
||||
matcher.clearCurrentRow();
|
||||
seekOrSkipToNextRow(cell);
|
||||
NextState stateAfterSeekNextRow = needToReturn(outResult);
|
||||
if (stateAfterSeekNextRow != null) {
|
||||
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
|
||||
}
|
||||
break;
|
||||
|
||||
case SEEK_NEXT_COL:
|
||||
seekOrSkipToNextColumn(cell);
|
||||
NextState stateAfterSeekNextColumn = needToReturn(outResult);
|
||||
if (stateAfterSeekNextColumn != null) {
|
||||
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
|
||||
}
|
||||
break;
|
||||
|
||||
case SKIP:
|
||||
|
@ -658,6 +667,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
Cell nextKV = matcher.getNextKeyHint(cell);
|
||||
if (nextKV != null) {
|
||||
seekAsDirection(nextKV);
|
||||
NextState stateAfterSeekByHint = needToReturn(outResult);
|
||||
if (stateAfterSeekByHint != null) {
|
||||
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
|
||||
}
|
||||
} else {
|
||||
heap.next();
|
||||
}
|
||||
|
@ -677,6 +690,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||
}
|
||||
|
||||
/**
|
||||
* If the top cell won't be flushed into disk, the new top cell may be
|
||||
* changed after #reopenAfterFlush. Because the older top cell only exist
|
||||
* in the memstore scanner but the memstore scanner is replaced by hfile
|
||||
* scanner after #reopenAfterFlush. If the row of top cell is changed,
|
||||
* we should return the current cells. Otherwise, we may return
|
||||
* the cells across different rows.
|
||||
* @param outResult the cells which are visible for user scan
|
||||
* @return null is the top cell doesn't change. Otherwise, the NextState
|
||||
* to return
|
||||
*/
|
||||
private NextState needToReturn(List<Cell> outResult) {
|
||||
if (!outResult.isEmpty() && topChanged) {
|
||||
return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void seekOrSkipToNextRow(Cell cell) throws IOException {
|
||||
// If it is a Get Scan, then we know that we are done with this row; there are no more
|
||||
// rows beyond the current one: don't try to optimize.
|
||||
|
@ -846,14 +877,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
resetScannerStack(this.lastTop);
|
||||
if (this.heap.peek() == null
|
||||
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
|
||||
LOG.debug("Storescanner.peek() is changed where before = "
|
||||
LOG.info("Storescanner.peek() is changed where before = "
|
||||
+ this.lastTop.toString() + ",and after = " + this.heap.peek());
|
||||
this.lastTop = null;
|
||||
topChanged = true;
|
||||
return true;
|
||||
}
|
||||
this.lastTop = null; // gone!
|
||||
}
|
||||
// else dont need to reseek
|
||||
topChanged = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -93,6 +95,8 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
|||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -178,7 +182,7 @@ public class TestStore {
|
|||
|
||||
@SuppressWarnings("deprecation")
|
||||
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
|
||||
HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
|
||||
|
@ -1154,6 +1158,10 @@ public class TestStore {
|
|||
}
|
||||
|
||||
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
|
||||
return createCell(row, qualifier, ts, sequenceId, value);
|
||||
}
|
||||
|
||||
private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
|
||||
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
|
||||
CellUtil.setSequenceId(c, sequenceId);
|
||||
return c;
|
||||
|
@ -1163,9 +1171,9 @@ public class TestStore {
|
|||
public void testScanWithDoubleFlush() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Initialize region
|
||||
MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
|
||||
MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
|
||||
@Override
|
||||
public void hook(final MyStore store) throws IOException {
|
||||
public void getScanners(final MyStore store) throws IOException {
|
||||
final long tmpId = id++;
|
||||
ExecutorService s = Executors.newSingleThreadExecutor();
|
||||
s.submit(new Runnable() {
|
||||
|
@ -1230,6 +1238,149 @@ public class TestStore {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
|
||||
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
|
||||
testFlushBeforeCompletingScan(new MyListHook() {
|
||||
@Override
|
||||
public void hook(int currentSize) {
|
||||
if (currentSize == 2) {
|
||||
try {
|
||||
flushStore(store, id++);
|
||||
timeToGoNextRow.set(true);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, new FilterBase() {
|
||||
@Override
|
||||
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
return Filter.ReturnCode.INCLUDE;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
|
||||
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
|
||||
testFlushBeforeCompletingScan(new MyListHook() {
|
||||
@Override
|
||||
public void hook(int currentSize) {
|
||||
if (currentSize == 2) {
|
||||
try {
|
||||
flushStore(store, id++);
|
||||
timeToGoNextRow.set(true);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, new FilterBase() {
|
||||
@Override
|
||||
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
if (timeToGoNextRow.get()) {
|
||||
timeToGoNextRow.set(false);
|
||||
return Filter.ReturnCode.NEXT_ROW;
|
||||
} else {
|
||||
return Filter.ReturnCode.INCLUDE;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
|
||||
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
|
||||
testFlushBeforeCompletingScan(new MyListHook() {
|
||||
@Override
|
||||
public void hook(int currentSize) {
|
||||
if (currentSize == 2) {
|
||||
try {
|
||||
flushStore(store, id++);
|
||||
timeToGetHint.set(true);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, new FilterBase() {
|
||||
@Override
|
||||
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
if (timeToGetHint.get()) {
|
||||
timeToGetHint.set(false);
|
||||
return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
|
||||
} else {
|
||||
return Filter.ReturnCode.INCLUDE;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public Cell getNextCellHint(Cell currentCell) throws IOException {
|
||||
return currentCell;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMaxVersions(1);
|
||||
byte[] r0 = Bytes.toBytes("row0");
|
||||
byte[] r1 = Bytes.toBytes("row1");
|
||||
byte[] r2 = Bytes.toBytes("row2");
|
||||
byte[] value0 = Bytes.toBytes("value0");
|
||||
byte[] value1 = Bytes.toBytes("value1");
|
||||
byte[] value2 = Bytes.toBytes("value2");
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() {
|
||||
@Override
|
||||
public long getSmallestReadPoint(HStore store) {
|
||||
return seqId + 3;
|
||||
}
|
||||
});
|
||||
// The cells having the value0 won't be flushed to disk because the value of max version is 1
|
||||
store.add(createCell(r0, qf1, ts, seqId, value0));
|
||||
store.add(createCell(r0, qf2, ts, seqId, value0));
|
||||
store.add(createCell(r0, qf3, ts, seqId, value0));
|
||||
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1));
|
||||
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1));
|
||||
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1));
|
||||
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2));
|
||||
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2));
|
||||
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2));
|
||||
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1));
|
||||
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1));
|
||||
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1));
|
||||
List<Cell> myList = new MyList<>(hook);
|
||||
Scan scan = new Scan()
|
||||
.withStartRow(r1)
|
||||
.setFilter(filter);
|
||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(
|
||||
scan, null, seqId + 3)){
|
||||
// r1
|
||||
scanner.next(myList);
|
||||
assertEquals(3, myList.size());
|
||||
for (Cell c : myList) {
|
||||
byte[] actualValue = CellUtil.cloneValue(c);
|
||||
assertTrue("expected:" + Bytes.toStringBinary(value1)
|
||||
+ ", actual:" + Bytes.toStringBinary(actualValue)
|
||||
, Bytes.equals(actualValue, value1));
|
||||
}
|
||||
List<Cell> normalList = new ArrayList<>(3);
|
||||
// r2
|
||||
scanner.next(normalList);
|
||||
assertEquals(3, normalList.size());
|
||||
for (Cell c : normalList) {
|
||||
byte[] actualValue = CellUtil.cloneValue(c);
|
||||
assertTrue("expected:" + Bytes.toStringBinary(value2)
|
||||
+ ", actual:" + Bytes.toStringBinary(actualValue)
|
||||
, Bytes.equals(actualValue, value2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReclaimChunkWhenScaning() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
@ -1291,7 +1442,7 @@ public class TestStore {
|
|||
}
|
||||
}
|
||||
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMaxVersions(5);
|
||||
|
@ -1300,10 +1451,10 @@ public class TestStore {
|
|||
|
||||
private static class MyStore extends HStore {
|
||||
|
||||
private final MyScannerHook hook;
|
||||
private final MyStoreHook hook;
|
||||
|
||||
MyStore(final HRegion region, final HColumnDescriptor family,
|
||||
final Configuration confParam, MyScannerHook hook) throws IOException {
|
||||
final Configuration confParam, MyStoreHook hook) throws IOException {
|
||||
super(region, family, confParam);
|
||||
this.hook = hook;
|
||||
}
|
||||
|
@ -1312,15 +1463,23 @@ public class TestStore {
|
|||
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
|
||||
boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
|
||||
byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
|
||||
hook.hook(this);
|
||||
hook.getScanners(this);
|
||||
return super.getScanners(files, cacheBlocks, isGet, usePread,
|
||||
isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSmallestReadPoint() {
|
||||
return hook.getSmallestReadPoint(this);
|
||||
}
|
||||
}
|
||||
|
||||
private interface MyScannerHook {
|
||||
|
||||
void hook(MyStore store) throws IOException;
|
||||
private abstract class MyStoreHook {
|
||||
void getScanners(MyStore store) throws IOException {
|
||||
}
|
||||
long getSmallestReadPoint(HStore store) {
|
||||
return store.getHRegion().getSmallestReadPoint();
|
||||
}
|
||||
}
|
||||
|
||||
interface MyListHook {
|
||||
|
|
Loading…
Reference in New Issue