HBASE-18295 The result contains the cells across different rows

This commit is contained in:
Chia-Ping Tsai 2017-07-12 02:27:29 +08:00
parent 1978b78cdf
commit d215cb4950
2 changed files with 232 additions and 42 deletions

View File

@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -158,6 +156,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private final ReentrantLock flushLock = new ReentrantLock(); private final ReentrantLock flushLock = new ReentrantLock();
protected final long readPt; protected final long readPt;
private boolean topChanged = false;
// used by the injection framework to test race between StoreScanner construction and compaction // used by the injection framework to test race between StoreScanner construction and compaction
enum StoreScannerCompactionRace { enum StoreScannerCompactionRace {
@ -606,6 +605,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
int cellSize = CellUtil.estimatedSerializedSizeOf(cell); int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
bytesRead += cellSize; bytesRead += cellSize;
prevCell = cell; prevCell = cell;
topChanged = false;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell); ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch (qcode) { switch (qcode) {
case INCLUDE: case INCLUDE:
@ -692,10 +692,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
matcher.clearCurrentRow(); matcher.clearCurrentRow();
seekOrSkipToNextRow(cell); seekOrSkipToNextRow(cell);
NextState stateAfterSeekNextRow = needToReturn(outResult);
if (stateAfterSeekNextRow != null) {
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
}
break; break;
case SEEK_NEXT_COL: case SEEK_NEXT_COL:
seekOrSkipToNextColumn(cell); seekOrSkipToNextColumn(cell);
NextState stateAfterSeekNextColumn = needToReturn(outResult);
if (stateAfterSeekNextColumn != null) {
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
}
break; break;
case SKIP: case SKIP:
@ -706,6 +714,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
Cell nextKV = matcher.getNextKeyHint(cell); Cell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) { if (nextKV != null) {
seekAsDirection(nextKV); seekAsDirection(nextKV);
NextState stateAfterSeekByHint = needToReturn(outResult);
if (stateAfterSeekByHint != null) {
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
}
} else { } else {
heap.next(); heap.next();
} }
@ -725,6 +737,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
/**
* If the top cell won't be flushed into disk, the new top cell may be
* changed after #reopenAfterFlush. Because the older top cell only exist
* in the memstore scanner but the memstore scanner is replaced by hfile
* scanner after #reopenAfterFlush. If the row of top cell is changed,
* we should return the current cells. Otherwise, we may return
* the cells across different rows.
* @param outResult the cells which are visible for user scan
* @return null is the top cell doesn't change. Otherwise, the NextState
* to return
*/
private NextState needToReturn(List<Cell> outResult) {
if (!outResult.isEmpty() && topChanged) {
return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
}
return null;
}
private void seekOrSkipToNextRow(Cell cell) throws IOException { private void seekOrSkipToNextRow(Cell cell) throws IOException {
// If it is a Get Scan, then we know that we are done with this row; there are no more // If it is a Get Scan, then we know that we are done with this row; there are no more
// rows beyond the current one: don't try to optimize. // rows beyond the current one: don't try to optimize.
@ -901,13 +931,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
resetKVHeap(this.currentScanners, store.getComparator()); resetKVHeap(this.currentScanners, store.getComparator());
resetQueryMatcher(lastTop); resetQueryMatcher(lastTop);
if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
if (LOG.isDebugEnabled()) { LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() +
LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() +
",and after = " + heap.peek()); ",and after = " + heap.peek());
topChanged = true;
} else {
topChanged = false;
} }
return true; return topChanged;
}
return false;
} }
private void resetQueryMatcher(Cell lastTopKey) { private void resetQueryMatcher(Cell lastTopKey) {

View File

@ -106,6 +106,10 @@ import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Test class for the Store * Test class for the Store
@ -184,12 +188,12 @@ public class TestStore {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd, private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException { HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
return init(methodName, conf, htd, hcd, hook, false); return init(methodName, conf, htd, hcd, hook, false);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd, private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException { HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
//Setting up a Store //Setting up a Store
Path basedir = new Path(DIR+methodName); Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@ -1060,6 +1064,150 @@ public class TestStore {
return c; return c;
} }
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
return ReturnCode.INCLUDE;
}
});
}
@Test
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
if (timeToGoNextRow.get()) {
timeToGoNextRow.set(false);
return ReturnCode.NEXT_ROW;
} else {
return ReturnCode.INCLUDE;
}
}
});
}
@Test
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
try {
flushStore(store, id++);
timeToGetHint.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}, new FilterBase() {
@Override
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
if (timeToGetHint.get()) {
timeToGetHint.set(false);
return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
} else {
return Filter.ReturnCode.INCLUDE;
}
}
@Override
public Cell getNextCellHint(Cell currentCell) throws IOException {
return currentCell;
}
});
}
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(1);
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
byte[] r2 = Bytes.toBytes("row2");
byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() {
@Override
public long getSmallestReadPoint(HStore store) {
return seqId + 3;
}
});
// The cells having the value0 won't be flushed to disk because the value of max version is 1
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize);
List<Cell> myList = new MyList<>(hook);
Scan scan = new Scan()
.withStartRow(r1)
.setFilter(filter);
try (InternalScanner scanner = (InternalScanner) store.getScanner(
scan, null, seqId + 3)){
// r1
scanner.next(myList);
assertEquals(3, myList.size());
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, value1));
}
List<Cell> normalList = new ArrayList<>(3);
// r2
scanner.next(normalList);
assertEquals(3, normalList.size());
for (Cell c : normalList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value2)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, value2));
}
}
}
@Test @Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
@ -1120,7 +1268,9 @@ public class TestStore {
public void testScanWithDoubleFlush() throws IOException { public void testScanWithDoubleFlush() throws IOException {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
// Initialize region // Initialize region
MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> { MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
@Override
public void getScanners(MyStore store) throws IOException {
final long tmpId = id++; final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor(); ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> { s.submit(() -> {
@ -1129,7 +1279,7 @@ public class TestStore {
// The current data will be flushed into files, and the memstore will // The current data will be flushed into files, and the memstore will
// be clear. // be clear.
// -- phase (4/4) // -- phase (4/4)
flushStore(store1, tmpId); flushStore(store, tmpId);
}catch (IOException ex) { }catch (IOException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
@ -1140,6 +1290,7 @@ public class TestStore {
s.awaitTermination(3, TimeUnit.SECONDS); s.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
} }
}
}); });
byte[] oldValue = Bytes.toBytes("oldValue"); byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue"); byte[] currentValue = Bytes.toBytes("currentValue");
@ -1290,7 +1441,7 @@ public class TestStore {
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
} }
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
throws IOException { throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family); HColumnDescriptor hcd = new HColumnDescriptor(family);
@ -1299,10 +1450,10 @@ public class TestStore {
} }
class MyStore extends HStore { class MyStore extends HStore {
private final MyScannerHook hook; private final MyStoreHook hook;
MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
MyScannerHook hook, boolean switchToPread) throws IOException { MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam); super(region, family, confParam);
this.hook = hook; this.hook = hook;
} }
@ -1312,13 +1463,22 @@ public class TestStore {
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException { boolean includeMemstoreScanner) throws IOException {
hook.hook(this); hook.getScanners(this);
return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
stopRow, false, readPt, includeMemstoreScanner); stopRow, false, readPt, includeMemstoreScanner);
} }
@Override
public long getSmallestReadPoint() {
return hook.getSmallestReadPoint(this);
}
}
private abstract class MyStoreHook {
void getScanners(MyStore store) throws IOException {
}
long getSmallestReadPoint(HStore store) {
return store.getHRegion().getSmallestReadPoint();
} }
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
} }
@Test @Test
@ -1330,12 +1490,7 @@ public class TestStore {
// Set the lower threshold to invoke the "MERGE" policy // Set the lower threshold to invoke the "MERGE" policy
HColumnDescriptor hcd = new HColumnDescriptor(family); HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() { MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
@Override
public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
throws IOException {
}
});
MemstoreSize memStoreSize = new MemstoreSize(); MemstoreSize memStoreSize = new MemstoreSize();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
long seqID = 1l; long seqID = 1l;
@ -1497,10 +1652,15 @@ public class TestStore {
} }
} }
} }
interface MyListHook {
void hook(int currentSize);
}
private static class MyList<T> implements List<T> { private static class MyList<T> implements List<T> {
private final List<T> delegatee = new ArrayList<>(); private final List<T> delegatee = new ArrayList<>();
private final Consumer<Integer> hookAtAdd; private final MyListHook hookAtAdd;
MyList(final Consumer<Integer> hookAtAdd) { MyList(final MyListHook hookAtAdd) {
this.hookAtAdd = hookAtAdd; this.hookAtAdd = hookAtAdd;
} }
@Override @Override
@ -1523,7 +1683,7 @@ public class TestStore {
@Override @Override
public boolean add(T e) { public boolean add(T e) {
hookAtAdd.accept(size()); hookAtAdd.hook(size());
return delegatee.add(e); return delegatee.add(e);
} }