HBASE-25709 Close region may stuck when region is compacting and skipped most cells read (#3117)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
fee9bb0c2f
commit
f3a48d1910
|
@ -756,6 +756,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
default:
|
||||
throw new RuntimeException("UNEXPECTED");
|
||||
}
|
||||
|
||||
// when reaching the heartbeat cells, try to return from the loop.
|
||||
if (kvsScanned % cellsPerHeartbeatCheck == 0) {
|
||||
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
||||
}
|
||||
} while ((cell = this.heap.peek()) != null);
|
||||
|
||||
if (count > 0) {
|
||||
|
|
|
@ -7047,6 +7047,74 @@ public class TestHRegion {
|
|||
assertNull(r.getValue(fam1, q1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
|
||||
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
|
||||
final byte[] row = Bytes.toBytes("testRow");
|
||||
final byte[] q1 = Bytes.toBytes("q1");
|
||||
final byte[] q2 = Bytes.toBytes("q2");
|
||||
final byte[] q3 = Bytes.toBytes("q3");
|
||||
final byte[] q4 = Bytes.toBytes("q4");
|
||||
final byte[] q5 = Bytes.toBytes("q5");
|
||||
final byte[] q6 = Bytes.toBytes("q6");
|
||||
final byte[] q7 = Bytes.toBytes("q7");
|
||||
final byte[] q8 = Bytes.toBytes("q8");
|
||||
|
||||
// 10 seconds
|
||||
int ttlSecs = 10;
|
||||
TableDescriptor tableDescriptor =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
|
||||
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
|
||||
// using small heart beat cells
|
||||
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
||||
|
||||
region = HBaseTestingUtil
|
||||
.createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
|
||||
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
|
||||
assertNotNull(region);
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// Add a cell that will expire in 5 seconds via cell TTL
|
||||
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||
// Add a cell that will expire after 10 seconds via family setting
|
||||
region
|
||||
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||
region
|
||||
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||
|
||||
region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||
region
|
||||
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||
|
||||
// Flush so we are sure store scanning gets this right
|
||||
region.flush(true);
|
||||
|
||||
// A query at time T+0 should return all cells
|
||||
checkScan(8);
|
||||
|
||||
// Increment time to T+ttlSecs seconds
|
||||
edge.incrementTime(ttlSecs * 1000);
|
||||
checkScan(3);
|
||||
}
|
||||
|
||||
private void checkScan(int expectCellSize) throws IOException{
|
||||
Scan s = new Scan().withStartRow(row);
|
||||
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
|
||||
ScannerContext scannerContext = contextBuilder.build();
|
||||
RegionScanner scanner = region.getScanner(s);
|
||||
List<Cell> kvs = new ArrayList<>();
|
||||
scanner.next(kvs, scannerContext);
|
||||
assertEquals(expectCellSize, kvs.size());
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
||||
region = initHRegion(tableName, method, CONF, fam1);
|
||||
|
|
|
@ -1278,6 +1278,77 @@ public class TestHStore {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreventLoopRead() throws Exception {
|
||||
init(this.name.getMethodName());
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// use small heart beat cells
|
||||
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
||||
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
byte[] r0 = Bytes.toBytes("row0");
|
||||
byte[] value0 = Bytes.toBytes("value0");
|
||||
byte[] value1 = Bytes.toBytes("value1");
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
|
||||
new MyStoreHook() {
|
||||
@Override public long getSmallestReadPoint(HStore store) {
|
||||
return seqId + 3;
|
||||
}
|
||||
});
|
||||
// The cells having the value0 will be expired
|
||||
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
|
||||
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
|
||||
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
|
||||
store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
||||
store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
|
||||
store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
||||
|
||||
List<Cell> myList = new ArrayList<>();
|
||||
Scan scan = new Scan().withStartRow(r0);
|
||||
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
|
||||
// test normal scan, should return all the cells
|
||||
ScannerContext scannerContext = contextBuilder.build();
|
||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||
seqId + 3)) {
|
||||
scanner.next(myList, scannerContext);
|
||||
assertEquals(6, myList.size());
|
||||
}
|
||||
|
||||
// test skip two ttl cells and return with empty results, default prevent loop skip is on
|
||||
edge.incrementTime(10 * 1000);
|
||||
scannerContext = contextBuilder.build();
|
||||
myList.clear();
|
||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||
seqId + 3)) {
|
||||
// r0
|
||||
scanner.next(myList, scannerContext);
|
||||
assertEquals(0, myList.size());
|
||||
}
|
||||
|
||||
// should scan all non-ttl expired cells by iterative next
|
||||
int resultCells = 0;
|
||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||
seqId + 3)) {
|
||||
boolean hasMore = true;
|
||||
while (hasMore) {
|
||||
myList.clear();
|
||||
hasMore = scanner.next(myList, scannerContext);
|
||||
assertTrue(myList.size() < 6);
|
||||
resultCells += myList.size();
|
||||
}
|
||||
for (Cell c : myList) {
|
||||
byte[] actualValue = CellUtil.cloneValue(c);
|
||||
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes
|
||||
.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
|
||||
}
|
||||
}
|
||||
assertEquals(2, resultCells);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
|
Loading…
Reference in New Issue