HBASE-25582 Support setting scan ReadType to be STREAM at cluster level (#3040)
Signed-off-by: zhangduo <zhangduo@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
da2b66bf6b
commit
e793303d7f
|
@ -132,9 +132,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
|
public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
|
* If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
|
||||||
* reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
|
* reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
|
||||||
* block size for this store.
|
* block size for this store.
|
||||||
|
* If configured with a value <0, for all scans with ReadType DEFAULT, we will open scanner with
|
||||||
|
* stream mode itself.
|
||||||
*/
|
*/
|
||||||
public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
|
public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
|
||||||
|
|
||||||
|
@ -180,6 +182,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1)
|
this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1)
|
||||||
&& (store == null || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
|
&& (store == null || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
|
||||||
this.maxRowSize = scanInfo.getTableMaxRowSize();
|
this.maxRowSize = scanInfo.getTableMaxRowSize();
|
||||||
|
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
|
||||||
if (get) {
|
if (get) {
|
||||||
this.readType = Scan.ReadType.PREAD;
|
this.readType = Scan.ReadType.PREAD;
|
||||||
this.scanUsePread = true;
|
this.scanUsePread = true;
|
||||||
|
@ -190,7 +193,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
this.scanUsePread = false;
|
this.scanUsePread = false;
|
||||||
} else {
|
} else {
|
||||||
if (scan.getReadType() == Scan.ReadType.DEFAULT) {
|
if (scan.getReadType() == Scan.ReadType.DEFAULT) {
|
||||||
this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
|
if (scanInfo.isUsePread()) {
|
||||||
|
this.readType = Scan.ReadType.PREAD;
|
||||||
|
} else if (this.preadMaxBytes < 0) {
|
||||||
|
this.readType = Scan.ReadType.STREAM;
|
||||||
|
} else {
|
||||||
|
this.readType = Scan.ReadType.DEFAULT;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.readType = scan.getReadType();
|
this.readType = scan.getReadType();
|
||||||
}
|
}
|
||||||
|
@ -198,7 +207,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// readType is default if the scan keeps running for a long time.
|
// readType is default if the scan keeps running for a long time.
|
||||||
this.scanUsePread = this.readType != Scan.ReadType.STREAM;
|
this.scanUsePread = this.readType != Scan.ReadType.STREAM;
|
||||||
}
|
}
|
||||||
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
|
|
||||||
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
|
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
|
||||||
// Parallel seeking is on if the config allows and more there is more than one store file.
|
// Parallel seeking is on if the config allows and more there is more than one store file.
|
||||||
if (store != null && store.getStorefilesCount() > 1) {
|
if (store != null && store.getStorefilesCount() > 1) {
|
||||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||||
|
@ -1664,6 +1665,22 @@ public class TestHStore {
|
||||||
assertFalse(heap.equals(heap2));
|
assertFalse(heap.equals(heap2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
|
||||||
|
// Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
|
||||||
|
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
|
||||||
|
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addFamily(family);
|
||||||
|
// ReadType on Scan is still DEFAULT only.
|
||||||
|
assertEquals(ReadType.DEFAULT, scan.getReadType());
|
||||||
|
StoreScanner storeScanner = (StoreScanner) store.getScanner(scan,
|
||||||
|
scan.getFamilyMap().get(family), Long.MAX_VALUE);
|
||||||
|
assertFalse(storeScanner.isScanUsePread());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
|
public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
Loading…
Reference in New Issue