diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index b92f9caa5d5..6cc5e4010d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -756,6 +756,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner default: throw new RuntimeException("UNEXPECTED"); } + + // when reaching the heartbeat cells, try to return from the loop. + if (kvsScanned % cellsPerHeartbeatCheck == 0) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } } while ((cell = this.heap.peek()) != null); if (count > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 28717a8f550..f1cfcf16a2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -7047,6 +7047,74 @@ public class TestHRegion { assertNull(r.getValue(fam1, q1)); } + @Test + public void testTTLsUsingSmallHeartBeatCells() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final byte[] q6 = Bytes.toBytes("q6"); + final byte[] q7 = Bytes.toBytes("q7"); + final byte[] q8 = Bytes.toBytes("q8"); + + // 10 seconds + int ttlSecs = 10; + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build(); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); + // using small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + + region = HBaseTestingUtil + .createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(), + TEST_UTIL.getDataTestDir(), conf, tableDescriptor); + assertNotNull(region); + long now = EnvironmentEdgeManager.currentTime(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire after 10 seconds via family setting + region + .put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flush(true); + + // A query at time T+0 should return all cells + checkScan(8); + + // Increment time to T+ttlSecs seconds + edge.incrementTime(ttlSecs * 1000); + checkScan(3); + } + + private void checkScan(int expectCellSize) throws IOException{ + Scan s = new Scan().withStartRow(row); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + ScannerContext scannerContext = contextBuilder.build(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList<>(); + scanner.next(kvs, scannerContext); + assertEquals(expectCellSize, kvs.size()); + scanner.close(); + } + @Test public void testIncrementTimestampsAreMonotonic() throws IOException { region = initHRegion(tableName, method, CONF, fam1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 7cc81938bd3..5543c03b650 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1278,6 +1278,77 @@ public class TestHStore { } } + @Test + public void testPreventLoopRead() throws Exception { + init(this.name.getMethodName()); + Configuration conf = HBaseConfiguration.create(); + // use small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + byte[] r0 = Bytes.toBytes("row0"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), + ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(), + new MyStoreHook() { + @Override public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 will be expired + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing); + store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing); + + List myList = new ArrayList<>(); + Scan scan = new Scan().withStartRow(r0); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false); + // test normal scan, should return all the cells + ScannerContext scannerContext = contextBuilder.build(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + scanner.next(myList, scannerContext); + assertEquals(6, myList.size()); + } + + // test skip two ttl cells and return with empty results, default prevent loop skip is on + edge.incrementTime(10 * 1000); + scannerContext = contextBuilder.build(); + myList.clear(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + // r0 + scanner.next(myList, scannerContext); + assertEquals(0, myList.size()); + } + + // should scan all non-ttl expired cells by iterative next + int resultCells = 0; + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + boolean hasMore = true; + while (hasMore) { + myList.clear(); + hasMore = scanner.next(myList, scannerContext); + assertTrue(myList.size() < 6); + resultCells += myList.size(); + } + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes + .toStringBinary(actualValue), Bytes.equals(actualValue, value1)); + } + } + assertEquals(2, resultCells); + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create();