Revert "HBASE-25709 Close region may stuck when region is compacting and skipped most cells read (#3117)" (#4524)
This reverts commit f3a48d1910
.
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
3e98869925
commit
5e34cdf1ef
|
@ -749,11 +749,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("UNEXPECTED");
|
throw new RuntimeException("UNEXPECTED");
|
||||||
}
|
}
|
||||||
|
|
||||||
// when reaching the heartbeat cells, try to return from the loop.
|
|
||||||
if (kvsScanned % cellsPerHeartbeatCheck == 0) {
|
|
||||||
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
|
||||||
}
|
|
||||||
} while ((cell = this.heap.peek()) != null);
|
} while ((cell = this.heap.peek()) != null);
|
||||||
|
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
|
|
|
@ -6923,74 +6923,6 @@ public class TestHRegion {
|
||||||
assertNull(r.getValue(fam1, q1));
|
assertNull(r.getValue(fam1, q1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
|
|
||||||
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
|
||||||
EnvironmentEdgeManager.injectEdge(edge);
|
|
||||||
|
|
||||||
final byte[] row = Bytes.toBytes("testRow");
|
|
||||||
final byte[] q1 = Bytes.toBytes("q1");
|
|
||||||
final byte[] q2 = Bytes.toBytes("q2");
|
|
||||||
final byte[] q3 = Bytes.toBytes("q3");
|
|
||||||
final byte[] q4 = Bytes.toBytes("q4");
|
|
||||||
final byte[] q5 = Bytes.toBytes("q5");
|
|
||||||
final byte[] q6 = Bytes.toBytes("q6");
|
|
||||||
final byte[] q7 = Bytes.toBytes("q7");
|
|
||||||
final byte[] q8 = Bytes.toBytes("q8");
|
|
||||||
|
|
||||||
// 10 seconds
|
|
||||||
int ttlSecs = 10;
|
|
||||||
TableDescriptor tableDescriptor =
|
|
||||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
|
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
|
|
||||||
|
|
||||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
|
||||||
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
|
|
||||||
// using small heart beat cells
|
|
||||||
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
|
||||||
|
|
||||||
region = HBaseTestingUtil.createRegionAndWAL(
|
|
||||||
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
|
|
||||||
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
|
|
||||||
assertNotNull(region);
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
// Add a cell that will expire in 5 seconds via cell TTL
|
|
||||||
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
// Add a cell that will expire after 10 seconds via family setting
|
|
||||||
region
|
|
||||||
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
region
|
|
||||||
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
|
|
||||||
region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
region
|
|
||||||
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
|
||||||
|
|
||||||
// Flush so we are sure store scanning gets this right
|
|
||||||
region.flush(true);
|
|
||||||
|
|
||||||
// A query at time T+0 should return all cells
|
|
||||||
checkScan(8);
|
|
||||||
|
|
||||||
// Increment time to T+ttlSecs seconds
|
|
||||||
edge.incrementTime(ttlSecs * 1000);
|
|
||||||
checkScan(3);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkScan(int expectCellSize) throws IOException {
|
|
||||||
Scan s = new Scan().withStartRow(row);
|
|
||||||
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
|
|
||||||
ScannerContext scannerContext = contextBuilder.build();
|
|
||||||
RegionScanner scanner = region.getScanner(s);
|
|
||||||
List<Cell> kvs = new ArrayList<>();
|
|
||||||
scanner.next(kvs, scannerContext);
|
|
||||||
assertEquals(expectCellSize, kvs.size());
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
||||||
region = initHRegion(tableName, method, CONF, fam1);
|
region = initHRegion(tableName, method, CONF, fam1);
|
||||||
|
|
|
@ -1258,75 +1258,6 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testPreventLoopRead() throws Exception {
|
|
||||||
init(this.name.getMethodName());
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
// use small heart beat cells
|
|
||||||
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
|
||||||
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
|
||||||
EnvironmentEdgeManager.injectEdge(edge);
|
|
||||||
byte[] r0 = Bytes.toBytes("row0");
|
|
||||||
byte[] value0 = Bytes.toBytes("value0");
|
|
||||||
byte[] value1 = Bytes.toBytes("value1");
|
|
||||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
|
||||||
long ts = EnvironmentEdgeManager.currentTime();
|
|
||||||
long seqId = 100;
|
|
||||||
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
|
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
|
|
||||||
new MyStoreHook() {
|
|
||||||
@Override
|
|
||||||
public long getSmallestReadPoint(HStore store) {
|
|
||||||
return seqId + 3;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// The cells having the value0 will be expired
|
|
||||||
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
|
|
||||||
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
|
|
||||||
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
|
|
||||||
store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
|
||||||
store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
|
|
||||||
store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);
|
|
||||||
|
|
||||||
List<Cell> myList = new ArrayList<>();
|
|
||||||
Scan scan = new Scan().withStartRow(r0);
|
|
||||||
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
|
|
||||||
// test normal scan, should return all the cells
|
|
||||||
ScannerContext scannerContext = contextBuilder.build();
|
|
||||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
|
|
||||||
scanner.next(myList, scannerContext);
|
|
||||||
assertEquals(6, myList.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// test skip two ttl cells and return with empty results, default prevent loop skip is on
|
|
||||||
edge.incrementTime(10 * 1000);
|
|
||||||
scannerContext = contextBuilder.build();
|
|
||||||
myList.clear();
|
|
||||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
|
|
||||||
// r0
|
|
||||||
scanner.next(myList, scannerContext);
|
|
||||||
assertEquals(0, myList.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// should scan all non-ttl expired cells by iterative next
|
|
||||||
int resultCells = 0;
|
|
||||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
|
|
||||||
boolean hasMore = true;
|
|
||||||
while (hasMore) {
|
|
||||||
myList.clear();
|
|
||||||
hasMore = scanner.next(myList, scannerContext);
|
|
||||||
assertTrue(myList.size() < 6);
|
|
||||||
resultCells += myList.size();
|
|
||||||
}
|
|
||||||
for (Cell c : myList) {
|
|
||||||
byte[] actualValue = CellUtil.cloneValue(c);
|
|
||||||
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
|
|
||||||
+ Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(2, resultCells);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
Loading…
Reference in New Issue