HBASE-25709 Close region may stuck when region is compacting and skipped most cells read (#3117)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
70ec3beef4
commit
bf9233f511
|
@ -756,6 +756,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("UNEXPECTED");
|
throw new RuntimeException("UNEXPECTED");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// when reaching the heartbeat cells, try to return from the loop.
|
||||||
|
if (kvsScanned % cellsPerHeartbeatCheck == 0) {
|
||||||
|
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
||||||
|
}
|
||||||
} while ((cell = this.heap.peek()) != null);
|
} while ((cell = this.heap.peek()) != null);
|
||||||
|
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
|
|
|
@ -7029,6 +7029,74 @@ public class TestHRegion {
|
||||||
assertNull(r.getValue(fam1, q1));
|
assertNull(r.getValue(fam1, q1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
|
||||||
|
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
final byte[] row = Bytes.toBytes("testRow");
|
||||||
|
final byte[] q1 = Bytes.toBytes("q1");
|
||||||
|
final byte[] q2 = Bytes.toBytes("q2");
|
||||||
|
final byte[] q3 = Bytes.toBytes("q3");
|
||||||
|
final byte[] q4 = Bytes.toBytes("q4");
|
||||||
|
final byte[] q5 = Bytes.toBytes("q5");
|
||||||
|
final byte[] q6 = Bytes.toBytes("q6");
|
||||||
|
final byte[] q7 = Bytes.toBytes("q7");
|
||||||
|
final byte[] q8 = Bytes.toBytes("q8");
|
||||||
|
|
||||||
|
// 10 seconds
|
||||||
|
int ttlSecs = 10;
|
||||||
|
TableDescriptor tableDescriptor =
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
|
||||||
|
// using small heart beat cells
|
||||||
|
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
||||||
|
|
||||||
|
region = HBaseTestingUtil
|
||||||
|
.createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
|
||||||
|
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
|
||||||
|
assertNotNull(region);
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
// Add a cell that will expire in 5 seconds via cell TTL
|
||||||
|
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
// Add a cell that will expire after 10 seconds via family setting
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
|
||||||
|
region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
|
||||||
|
// Flush so we are sure store scanning gets this right
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
// A query at time T+0 should return all cells
|
||||||
|
checkScan(8);
|
||||||
|
|
||||||
|
// Increment time to T+ttlSecs seconds
|
||||||
|
edge.incrementTime(ttlSecs * 1000);
|
||||||
|
checkScan(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkScan(int expectCellSize) throws IOException{
|
||||||
|
Scan s = new Scan().withStartRow(row);
|
||||||
|
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
|
||||||
|
ScannerContext scannerContext = contextBuilder.build();
|
||||||
|
RegionScanner scanner = region.getScanner(s);
|
||||||
|
List<Cell> kvs = new ArrayList<>();
|
||||||
|
scanner.next(kvs, scannerContext);
|
||||||
|
assertEquals(expectCellSize, kvs.size());
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
||||||
region = initHRegion(tableName, method, CONF, fam1);
|
region = initHRegion(tableName, method, CONF, fam1);
|
||||||
|
|
|
@ -1277,6 +1277,77 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreventLoopRead() throws Exception {
|
||||||
|
init(this.name.getMethodName());
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
// use small heart beat cells
|
||||||
|
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
||||||
|
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
byte[] r0 = Bytes.toBytes("row0");
|
||||||
|
byte[] value0 = Bytes.toBytes("value0");
|
||||||
|
byte[] value1 = Bytes.toBytes("value1");
|
||||||
|
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||||
|
long ts = EnvironmentEdgeManager.currentTime();
|
||||||
|
long seqId = 100;
|
||||||
|
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
|
||||||
|
new MyStoreHook() {
|
||||||
|
@Override public long getSmallestReadPoint(HStore store) {
|
||||||
|
return seqId + 3;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// The cells having the value0 will be expired
|
||||||
|
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
|
||||||
|
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
|
||||||
|
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
|
||||||
|
store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
||||||
|
store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
|
||||||
|
store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
||||||
|
|
||||||
|
List<Cell> myList = new ArrayList<>();
|
||||||
|
Scan scan = new Scan().withStartRow(r0);
|
||||||
|
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
|
||||||
|
// test normal scan, should return all the cells
|
||||||
|
ScannerContext scannerContext = contextBuilder.build();
|
||||||
|
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||||
|
seqId + 3)) {
|
||||||
|
scanner.next(myList, scannerContext);
|
||||||
|
assertEquals(6, myList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// test skip two ttl cells and return with empty results, default prevent loop skip is on
|
||||||
|
edge.incrementTime(10 * 1000);
|
||||||
|
scannerContext = contextBuilder.build();
|
||||||
|
myList.clear();
|
||||||
|
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||||
|
seqId + 3)) {
|
||||||
|
// r0
|
||||||
|
scanner.next(myList, scannerContext);
|
||||||
|
assertEquals(0, myList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// should scan all non-ttl expired cells by iterative next
|
||||||
|
int resultCells = 0;
|
||||||
|
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
|
||||||
|
seqId + 3)) {
|
||||||
|
boolean hasMore = true;
|
||||||
|
while (hasMore) {
|
||||||
|
myList.clear();
|
||||||
|
hasMore = scanner.next(myList, scannerContext);
|
||||||
|
assertTrue(myList.size() < 6);
|
||||||
|
resultCells += myList.size();
|
||||||
|
}
|
||||||
|
for (Cell c : myList) {
|
||||||
|
byte[] actualValue = CellUtil.cloneValue(c);
|
||||||
|
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes
|
||||||
|
.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(2, resultCells);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
Loading…
Reference in New Issue