diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 11301d865b6..1fcb3143f16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -158,6 +156,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final ReentrantLock flushLock = new ReentrantLock(); protected final long readPt; + private boolean topChanged = false; // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { @@ -606,6 +605,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int cellSize = CellUtil.estimatedSerializedSizeOf(cell); bytesRead += cellSize; prevCell = cell; + topChanged = false; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); switch (qcode) { case INCLUDE: @@ -692,10 +692,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } matcher.clearCurrentRow(); seekOrSkipToNextRow(cell); + NextState stateAfterSeekNextRow = needToReturn(outResult); + if (stateAfterSeekNextRow != null) { + return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); + } break; case SEEK_NEXT_COL: seekOrSkipToNextColumn(cell); + NextState stateAfterSeekNextColumn = needToReturn(outResult); + if (stateAfterSeekNextColumn != null) { + return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); + } break; case SKIP: @@ -706,6 +714,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner Cell nextKV = matcher.getNextKeyHint(cell); if (nextKV != null) { seekAsDirection(nextKV); + NextState stateAfterSeekByHint = needToReturn(outResult); + if (stateAfterSeekByHint != null) { + return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); + } } else { heap.next(); } @@ -725,6 +737,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } + /** + * If the top cell won't be flushed into disk, the new top cell may be + * changed after #reopenAfterFlush. Because the older top cell only exist + * in the memstore scanner but the memstore scanner is replaced by hfile + * scanner after #reopenAfterFlush. If the row of top cell is changed, + * we should return the current cells. Otherwise, we may return + * the cells across different rows. + * @param outResult the cells which are visible for user scan + * @return null is the top cell doesn't change. Otherwise, the NextState + * to return + */ + private NextState needToReturn(List outResult) { + if (!outResult.isEmpty() && topChanged) { + return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; + } + return null; + } + private void seekOrSkipToNextRow(Cell cell) throws IOException { // If it is a Get Scan, then we know that we are done with this row; there are no more // rows beyond the current one: don't try to optimize. @@ -901,13 +931,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner resetKVHeap(this.currentScanners, store.getComparator()); resetQueryMatcher(lastTop); if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() + - ",and after = " + heap.peek()); - } - return true; + LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() + + ",and after = " + heap.peek()); + topChanged = true; + } else { + topChanged = false; } - return false; + return topChanged; } private void resetQueryMatcher(Cell lastTopKey) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9a49f07c38f..56a61821f1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -106,6 +106,10 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test class for the Store @@ -184,12 +188,12 @@ public class TestStore { @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, - HColumnDescriptor hcd, MyScannerHook hook) throws IOException { + HColumnDescriptor hcd, MyStoreHook hook) throws IOException { return init(methodName, conf, htd, hcd, hook, false); } @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, - HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException { + HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -1060,6 +1064,150 @@ public class TestStore { return c; } + @Test + public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + return ReturnCode.INCLUDE; + } + }); + } + + @Test + public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGoNextRow.get()) { + timeToGoNextRow.set(false); + return ReturnCode.NEXT_ROW; + } else { + return ReturnCode.INCLUDE; + } + } + }); + } + + @Test + public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { + final AtomicBoolean timeToGetHint = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGetHint.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGetHint.get()) { + timeToGetHint.set(false); + return Filter.ReturnCode.SEEK_NEXT_USING_HINT; + } else { + return Filter.ReturnCode.INCLUDE; + } + } + @Override + public Cell getNextCellHint(Cell currentCell) throws IOException { + return currentCell; + } + }); + } + + private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter) + throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(1); + byte[] r0 = Bytes.toBytes("row0"); + byte[] r1 = Bytes.toBytes("row1"); + byte[] r2 = Bytes.toBytes("row2"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() { + @Override + public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 won't be flushed to disk because the value of max version is 1 + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize); + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize); + store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize); + store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize); + List myList = new MyList<>(hook); + Scan scan = new Scan() + .withStartRow(r1) + .setFilter(filter); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + scan, null, seqId + 3)){ + // r1 + scanner.next(myList); + assertEquals(3, myList.size()); + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value1)); + } + List normalList = new ArrayList<>(3); + // r2 + scanner.next(normalList); + assertEquals(3, normalList.size()); + for (Cell c : normalList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value2) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value2)); + } + } + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1120,25 +1268,28 @@ public class TestStore { public void testScanWithDoubleFlush() throws IOException { Configuration conf = HBaseConfiguration.create(); // Initialize region - MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> { - final long tmpId = id++; - ExecutorService s = Executors.newSingleThreadExecutor(); - s.submit(() -> { + MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ + @Override + public void getScanners(MyStore store) throws IOException { + final long tmpId = id++; + ExecutorService s = Executors.newSingleThreadExecutor(); + s.submit(() -> { + try { + // flush the store before storescanner updates the scanners from store. + // The current data will be flushed into files, and the memstore will + // be clear. + // -- phase (4/4) + flushStore(store, tmpId); + }catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + s.shutdown(); try { - // flush the store before storescanner updates the scanners from store. - // The current data will be flushed into files, and the memstore will - // be clear. - // -- phase (4/4) - flushStore(store1, tmpId); - }catch (IOException ex) { - throw new RuntimeException(ex); + // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. + s.awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException ex) { } - }); - s.shutdown(); - try { - // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. - s.awaitTermination(3, TimeUnit.SECONDS); - } catch (InterruptedException ex) { } }); byte[] oldValue = Bytes.toBytes("oldValue"); @@ -1290,7 +1441,7 @@ public class TestStore { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) + private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1299,10 +1450,10 @@ public class TestStore { } class MyStore extends HStore { - private final MyScannerHook hook; + private final MyStoreHook hook; MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, - MyScannerHook hook, boolean switchToPread) throws IOException { + MyStoreHook hook, boolean switchToPread) throws IOException { super(region, family, confParam); this.hook = hook; } @@ -1312,13 +1463,22 @@ public class TestStore { boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException { - hook.hook(this); + hook.getScanners(this); return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); } + @Override + public long getSmallestReadPoint() { + return hook.getSmallestReadPoint(this); + } } - private interface MyScannerHook { - void hook(MyStore store) throws IOException; + + private abstract class MyStoreHook { + void getScanners(MyStore store) throws IOException { + } + long getSmallestReadPoint(HStore store) { + return store.getHRegion().getSmallestReadPoint(); + } } @Test @@ -1330,12 +1490,7 @@ public class TestStore { // Set the lower threshold to invoke the "MERGE" policy HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); - MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() { - @Override - public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store) - throws IOException { - } - }); + MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); MemstoreSize memStoreSize = new MemstoreSize(); long ts = System.currentTimeMillis(); long seqID = 1l; @@ -1497,10 +1652,15 @@ public class TestStore { } } } + + interface MyListHook { + void hook(int currentSize); + } + private static class MyList implements List { private final List delegatee = new ArrayList<>(); - private final Consumer hookAtAdd; - MyList(final Consumer hookAtAdd) { + private final MyListHook hookAtAdd; + MyList(final MyListHook hookAtAdd) { this.hookAtAdd = hookAtAdd; } @Override @@ -1523,7 +1683,7 @@ public class TestStore { @Override public boolean add(T e) { - hookAtAdd.accept(size()); + hookAtAdd.hook(size()); return delegatee.add(e); }