HBASE-18295 The result contains the cells across different rows

This commit is contained in:
Chia-Ping Tsai 2017-07-12 02:34:01 +08:00
parent 8a8e299eee
commit 4c699fd821
2 changed files with 232 additions and 42 deletions

View File

@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@ -158,6 +156,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private final ReentrantLock flushLock = new ReentrantLock();
protected final long readPt;
private boolean topChanged = false;
// used by the injection framework to test race between StoreScanner construction and compaction
enum StoreScannerCompactionRace {
@ -606,6 +605,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
bytesRead += cellSize;
prevCell = cell;
topChanged = false;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch (qcode) {
case INCLUDE:
@ -692,10 +692,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher.clearCurrentRow();
seekOrSkipToNextRow(cell);
NextState stateAfterSeekNextRow = needToReturn(outResult);
if (stateAfterSeekNextRow != null) {
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
}
break;
case SEEK_NEXT_COL:
seekOrSkipToNextColumn(cell);
NextState stateAfterSeekNextColumn = needToReturn(outResult);
if (stateAfterSeekNextColumn != null) {
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
}
break;
case SKIP:
@ -706,6 +714,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
Cell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) {
seekAsDirection(nextKV);
NextState stateAfterSeekByHint = needToReturn(outResult);
if (stateAfterSeekByHint != null) {
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
}
} else {
heap.next();
}
@ -725,6 +737,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
/**
* If the top cell won't be flushed into disk, the new top cell may be
* changed after #reopenAfterFlush. Because the older top cell only exist
* in the memstore scanner but the memstore scanner is replaced by hfile
* scanner after #reopenAfterFlush. If the row of top cell is changed,
* we should return the current cells. Otherwise, we may return
* the cells across different rows.
* @param outResult the cells which are visible for user scan
* @return null is the top cell doesn't change. Otherwise, the NextState
* to return
*/
private NextState needToReturn(List<Cell> outResult) {
if (!outResult.isEmpty() && topChanged) {
return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
}
return null;
}
private void seekOrSkipToNextRow(Cell cell) throws IOException {
// If it is a Get Scan, then we know that we are done with this row; there are no more
// rows beyond the current one: don't try to optimize.
@ -901,13 +931,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
resetKVHeap(this.currentScanners, store.getComparator());
resetQueryMatcher(lastTop);
if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() +
",and after = " + heap.peek());
}
return true;
LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() +
",and after = " + heap.peek());
topChanged = true;
} else {
topChanged = false;
}
return false;
return topChanged;
}
private void resetQueryMatcher(Cell lastTopKey) {

View File

@ -106,6 +106,10 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test class for the Store
@ -184,12 +188,12 @@ public class TestStore {
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
return init(methodName, conf, htd, hcd, hook, false);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException {
HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@ -1054,6 +1058,150 @@ public class TestStore {
return c;
}
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
return ReturnCode.INCLUDE;
}
});
}
@Test
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
if (timeToGoNextRow.get()) {
timeToGoNextRow.set(false);
return ReturnCode.NEXT_ROW;
} else {
return ReturnCode.INCLUDE;
}
}
});
}
@Test
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGetHint.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
if (timeToGetHint.get()) {
timeToGetHint.set(false);
return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
} else {
return Filter.ReturnCode.INCLUDE;
}
}
@Override
public Cell getNextCellHint(Cell currentCell) throws IOException {
return currentCell;
}
});
}
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(1);
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
byte[] r2 = Bytes.toBytes("row2");
byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() {
@Override
public long getSmallestReadPoint(HStore store) {
return seqId + 3;
}
});
// The cells having the value0 won't be flushed to disk because the value of max version is 1
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize);
List<Cell> myList = new MyList<>(hook);
Scan scan = new Scan()
.withStartRow(r1)
.setFilter(filter);
try (InternalScanner scanner = (InternalScanner) store.getScanner(
scan, null, seqId + 3)){
// r1
scanner.next(myList);
assertEquals(3, myList.size());
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, value1));
}
List<Cell> normalList = new ArrayList<>(3);
// r2
scanner.next(normalList);
assertEquals(3, normalList.size());
for (Cell c : normalList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value2)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, value2));
}
}
}
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
@ -1114,25 +1262,28 @@ public class TestStore {
public void testScanWithDoubleFlush() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Initialize region
MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> {
final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> {
MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
@Override
public void getScanners(MyStore store) throws IOException {
final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> {
try {
// flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files, and the memstore will
// be clear.
// -- phase (4/4)
flushStore(store, tmpId);
}catch (IOException ex) {
throw new RuntimeException(ex);
}
});
s.shutdown();
try {
// flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files, and the memstore will
// be clear.
// -- phase (4/4)
flushStore(store1, tmpId);
}catch (IOException ex) {
throw new RuntimeException(ex);
// wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
s.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
});
s.shutdown();
try {
// wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
s.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
});
byte[] oldValue = Bytes.toBytes("oldValue");
@ -1284,7 +1435,7 @@ public class TestStore {
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
@ -1293,10 +1444,10 @@ public class TestStore {
}
class MyStore extends HStore {
private final MyScannerHook hook;
private final MyStoreHook hook;
MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
MyScannerHook hook, boolean switchToPread) throws IOException {
MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@ -1306,13 +1457,22 @@ public class TestStore {
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
hook.hook(this);
hook.getScanners(this);
return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
stopRow, false, readPt, includeMemstoreScanner);
}
@Override
public long getSmallestReadPoint() {
return hook.getSmallestReadPoint(this);
}
}
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
private abstract class MyStoreHook {
void getScanners(MyStore store) throws IOException {
}
long getSmallestReadPoint(HStore store) {
return store.getHRegion().getSmallestReadPoint();
}
}
@Test
@ -1324,12 +1484,7 @@ public class TestStore {
// Set the lower threshold to invoke the "MERGE" policy
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
@Override
public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
throws IOException {
}
});
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
MemstoreSize memStoreSize = new MemstoreSize();
long ts = System.currentTimeMillis();
long seqID = 1l;
@ -1491,10 +1646,15 @@ public class TestStore {
}
}
}
interface MyListHook {
void hook(int currentSize);
}
private static class MyList<T> implements List<T> {
private final List<T> delegatee = new ArrayList<>();
private final Consumer<Integer> hookAtAdd;
MyList(final Consumer<Integer> hookAtAdd) {
private final MyListHook hookAtAdd;
MyList(final MyListHook hookAtAdd) {
this.hookAtAdd = hookAtAdd;
}
@Override
@ -1517,7 +1677,7 @@ public class TestStore {
@Override
public boolean add(T e) {
hookAtAdd.accept(size());
hookAtAdd.hook(size());
return delegatee.add(e);
}