HBASE-25709 Close region may stuck when region is compacting and skipped most cells read (#4536) (#5086)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
6d463f7aab
commit
1ac7bbc8f3
|
@ -360,6 +360,11 @@ public final class HConstants {
|
||||||
public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
|
public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
|
||||||
public static final int COMPACTION_KV_MAX_DEFAULT = 10;
|
public static final int COMPACTION_KV_MAX_DEFAULT = 10;
|
||||||
|
|
||||||
|
/** Parameter name for the scanner size limit to be used in compactions */
|
||||||
|
public static final String COMPACTION_SCANNER_SIZE_MAX =
|
||||||
|
"hbase.hstore.compaction.scanner.size.limit";
|
||||||
|
public static final long COMPACTION_SCANNER_SIZE_MAX_DEFAULT = 10 * 1024 * 1024L; // 10MB
|
||||||
|
|
||||||
/** Parameter name for HBase instance root directory */
|
/** Parameter name for HBase instance root directory */
|
||||||
public static final String HBASE_DIR = "hbase.rootdir";
|
public static final String HBASE_DIR = "hbase.rootdir";
|
||||||
|
|
||||||
|
|
|
@ -346,8 +346,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
|
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
|
||||||
boolean finished = false;
|
boolean finished = false;
|
||||||
|
|
||||||
ScannerContext scannerContext =
|
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
|
||||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
|
||||||
|
compactScannerSizeLimit)
|
||||||
|
.build();
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
||||||
long shippedCallSizeLimit =
|
long shippedCallSizeLimit =
|
||||||
|
|
|
@ -82,6 +82,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
protected final HStore store;
|
protected final HStore store;
|
||||||
protected final int compactionKVMax;
|
protected final int compactionKVMax;
|
||||||
|
protected final long compactScannerSizeLimit;
|
||||||
protected final Compression.Algorithm majorCompactionCompression;
|
protected final Compression.Algorithm majorCompactionCompression;
|
||||||
protected final Compression.Algorithm minorCompactionCompression;
|
protected final Compression.Algorithm minorCompactionCompression;
|
||||||
|
|
||||||
|
@ -108,6 +109,8 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.compactionKVMax =
|
this.compactionKVMax =
|
||||||
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
||||||
|
this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX,
|
||||||
|
HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT);
|
||||||
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
|
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
|
||||||
? Compression.Algorithm.NONE
|
? Compression.Algorithm.NONE
|
||||||
: store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
|
: store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
|
||||||
|
@ -428,8 +431,10 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
|
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
|
||||||
long now = 0;
|
long now = 0;
|
||||||
boolean hasMore;
|
boolean hasMore;
|
||||||
ScannerContext scannerContext =
|
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
|
||||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
|
||||||
|
compactScannerSizeLimit)
|
||||||
|
.build();
|
||||||
|
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
|
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
|
||||||
|
|
|
@ -140,8 +140,10 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||||
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
|
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
|
||||||
boolean finished = false;
|
boolean finished = false;
|
||||||
|
|
||||||
ScannerContext scannerContext =
|
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
|
||||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
|
||||||
|
compactScannerSizeLimit)
|
||||||
|
.build();
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
||||||
long shippedCallSizeLimit =
|
long shippedCallSizeLimit =
|
||||||
|
|
|
@ -119,6 +119,7 @@ public class TestCompaction {
|
||||||
// Set cache flush size to 1MB
|
// Set cache flush size to 1MB
|
||||||
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
||||||
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
|
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
|
||||||
|
conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
|
||||||
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
|
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
|
||||||
NoLimitThroughputController.class.getName());
|
NoLimitThroughputController.class.getName());
|
||||||
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
||||||
|
|
|
@ -6910,6 +6910,75 @@ public class TestHRegion {
|
||||||
assertNull(r.getValue(fam1, q1));
|
assertNull(r.getValue(fam1, q1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
|
||||||
|
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
final byte[] row = Bytes.toBytes("testRow");
|
||||||
|
final byte[] q1 = Bytes.toBytes("q1");
|
||||||
|
final byte[] q2 = Bytes.toBytes("q2");
|
||||||
|
final byte[] q3 = Bytes.toBytes("q3");
|
||||||
|
final byte[] q4 = Bytes.toBytes("q4");
|
||||||
|
final byte[] q5 = Bytes.toBytes("q5");
|
||||||
|
final byte[] q6 = Bytes.toBytes("q6");
|
||||||
|
final byte[] q7 = Bytes.toBytes("q7");
|
||||||
|
final byte[] q8 = Bytes.toBytes("q8");
|
||||||
|
|
||||||
|
// 10 seconds
|
||||||
|
int ttlSecs = 10;
|
||||||
|
TableDescriptor tableDescriptor =
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
|
||||||
|
// using small heart beat cells
|
||||||
|
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
|
||||||
|
|
||||||
|
region = HBaseTestingUtility.createRegionAndWAL(
|
||||||
|
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
|
||||||
|
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
|
||||||
|
assertNotNull(region);
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
// Add a cell that will expire in 5 seconds via cell TTL
|
||||||
|
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
// Add a cell that will expire after 10 seconds via family setting
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
|
||||||
|
region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
region
|
||||||
|
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
|
||||||
|
// Flush so we are sure store scanning gets this right
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
// A query at time T+0 should return all cells
|
||||||
|
checkScan(8);
|
||||||
|
region.delete(new Delete(row).addColumn(fam1, q8));
|
||||||
|
|
||||||
|
// Increment time to T+ttlSecs seconds
|
||||||
|
edge.incrementTime(ttlSecs * 1000);
|
||||||
|
checkScan(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkScan(int expectCellSize) throws IOException {
|
||||||
|
Scan s = new Scan().withStartRow(row);
|
||||||
|
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
|
||||||
|
ScannerContext scannerContext = contextBuilder.build();
|
||||||
|
RegionScanner scanner = region.getScanner(s);
|
||||||
|
List<Cell> kvs = new ArrayList<>();
|
||||||
|
scanner.next(kvs, scannerContext);
|
||||||
|
assertEquals(expectCellSize, kvs.size());
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
||||||
region = initHRegion(tableName, method, CONF, fam1);
|
region = initHRegion(tableName, method, CONF, fam1);
|
||||||
|
|
Loading…
Reference in New Issue