HBASE-20457 Return immediately for a scan rpc call when we want to switch from pread to stream
This commit is contained in:
parent
b7def9b690
commit
60b8344cf1
|
@ -279,6 +279,7 @@ public class RpcRetryingCallerWithReadReplicas {
|
||||||
throws RetriesExhaustedException, DoNotRetryIOException {
|
throws RetriesExhaustedException, DoNotRetryIOException {
|
||||||
Throwable t = e.getCause();
|
Throwable t = e.getCause();
|
||||||
assert t != null; // That's what ExecutionException is about: holding an exception
|
assert t != null; // That's what ExecutionException is about: holding an exception
|
||||||
|
t.printStackTrace();
|
||||||
|
|
||||||
if (t instanceof RetriesExhaustedException) {
|
if (t instanceof RetriesExhaustedException) {
|
||||||
throw (RetriesExhaustedException) t;
|
throw (RetriesExhaustedException) t;
|
||||||
|
|
|
@ -99,6 +99,12 @@ public class ScannerContext {
|
||||||
|
|
||||||
private Cell lastPeekedCell = null;
|
private Cell lastPeekedCell = null;
|
||||||
|
|
||||||
|
// Set this to true will have the same behavior with reaching the time limit.
|
||||||
|
// This is used when you want to make the current RSRpcService.scan returns immediately. For
|
||||||
|
// example, when we want to switch from pread to stream, we can only do it after the rpc call is
|
||||||
|
// returned.
|
||||||
|
private boolean returnImmediately;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
|
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
|
||||||
*/
|
*/
|
||||||
|
@ -278,7 +284,8 @@ public class ScannerContext {
|
||||||
* @return true if the time limit can be enforced in the checker's scope
|
* @return true if the time limit can be enforced in the checker's scope
|
||||||
*/
|
*/
|
||||||
boolean hasTimeLimit(LimitScope checkerScope) {
|
boolean hasTimeLimit(LimitScope checkerScope) {
|
||||||
return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
|
return limits.canEnforceTimeLimitFromScope(checkerScope) &&
|
||||||
|
(limits.getTime() > 0 || returnImmediately);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -338,7 +345,8 @@ public class ScannerContext {
|
||||||
* @return true when the limit is enforceable from the checker's scope and it has been reached
|
* @return true when the limit is enforceable from the checker's scope and it has been reached
|
||||||
*/
|
*/
|
||||||
boolean checkTimeLimit(LimitScope checkerScope) {
|
boolean checkTimeLimit(LimitScope checkerScope) {
|
||||||
return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
|
return hasTimeLimit(checkerScope) &&
|
||||||
|
(returnImmediately || System.currentTimeMillis() >= limits.getTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -358,6 +366,10 @@ public class ScannerContext {
|
||||||
this.lastPeekedCell = lastPeekedCell;
|
this.lastPeekedCell = lastPeekedCell;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void returnImmediately() {
|
||||||
|
this.returnImmediately = true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
@ -570,11 +582,6 @@ public class ScannerContext {
|
||||||
LimitFields() {
|
LimitFields() {
|
||||||
}
|
}
|
||||||
|
|
||||||
LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope,
|
|
||||||
long time) {
|
|
||||||
setFields(batch, sizeScope, size, heapSize, timeScope, time);
|
|
||||||
}
|
|
||||||
|
|
||||||
void copy(LimitFields limitsToCopy) {
|
void copy(LimitFields limitsToCopy) {
|
||||||
if (limitsToCopy != null) {
|
if (limitsToCopy != null) {
|
||||||
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(),
|
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(),
|
||||||
|
@ -722,12 +729,6 @@ public class ScannerContext {
|
||||||
// such AND data cells of Cells which are in on heap area.
|
// such AND data cells of Cells which are in on heap area.
|
||||||
long heapSize = DEFAULT_SIZE;
|
long heapSize = DEFAULT_SIZE;
|
||||||
|
|
||||||
/**
|
|
||||||
* Fields keep their default values.
|
|
||||||
*/
|
|
||||||
ProgressFields() {
|
|
||||||
}
|
|
||||||
|
|
||||||
ProgressFields(int batch, long size, long heapSize) {
|
ProgressFields(int batch, long size, long heapSize) {
|
||||||
setFields(batch, size, heapSize);
|
setFields(batch, size, heapSize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,7 +553,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
LOOP: do {
|
LOOP: do {
|
||||||
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
|
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
|
||||||
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
|
// Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in
|
||||||
|
// the shipped method below.
|
||||||
|
if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread &&
|
||||||
|
scan.getReadType() == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
|
||||||
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
||||||
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
||||||
}
|
}
|
||||||
|
@ -565,6 +568,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
checkScanOrder(prevCell, cell, comparator);
|
checkScanOrder(prevCell, cell, comparator);
|
||||||
int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
|
int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
|
||||||
bytesRead += cellSize;
|
bytesRead += cellSize;
|
||||||
|
if (scanUsePread && scan.getReadType() == Scan.ReadType.DEFAULT &&
|
||||||
|
bytesRead > preadMaxBytes) {
|
||||||
|
// return immediately if we want to switch from pread to stream. We need this because we can
|
||||||
|
// only switch in the shipped method, if user use a filter to filter out everything and rpc
|
||||||
|
// timeout is very large then the shipped method will never be called until the whole scan
|
||||||
|
// is finished, but at that time we have already scan all the data...
|
||||||
|
// See HBASE-20457 for more details.
|
||||||
|
// And there is still a scenario that can not be handled. If we have a very large row, which
|
||||||
|
// have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
|
||||||
|
// here, we still need to scan all the qualifiers before returning...
|
||||||
|
scannerContext.returnImmediately();
|
||||||
|
}
|
||||||
prevCell = cell;
|
prevCell = cell;
|
||||||
scannerContext.setLastPeekedCell(cell);
|
scannerContext.setLastPeekedCell(cell);
|
||||||
topChanged = false;
|
topChanged = false;
|
||||||
|
|
|
@ -34,13 +34,17 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -49,7 +53,7 @@ public class TestSwitchToStreamRead {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
|
HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@ -73,18 +77,18 @@ public class TestSwitchToStreamRead {
|
||||||
VALUE_PREFIX = sb.append("-").toString();
|
VALUE_PREFIX = sb.append("-").toString();
|
||||||
REGION = UTIL.createLocalHRegion(
|
REGION = UTIL.createLocalHRegion(
|
||||||
TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||||
.setColumnFamily(
|
.setColumnFamily(
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
|
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
|
||||||
.build(),
|
.build(),
|
||||||
null, null);
|
null, null);
|
||||||
for (int i = 0; i < 900; i++) {
|
for (int i = 0; i < 900; i++) {
|
||||||
REGION
|
REGION
|
||||||
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
|
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
|
||||||
}
|
}
|
||||||
REGION.flush(true);
|
REGION.flush(true);
|
||||||
for (int i = 900; i < 1000; i++) {
|
for (int i = 900; i < 1000; i++) {
|
||||||
REGION
|
REGION
|
||||||
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
|
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,8 +101,8 @@ public class TestSwitchToStreamRead {
|
||||||
@Test
|
@Test
|
||||||
public void test() throws IOException {
|
public void test() throws IOException {
|
||||||
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
|
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
|
||||||
StoreScanner storeScanner = (StoreScanner) (scanner)
|
StoreScanner storeScanner =
|
||||||
.getStoreHeapForTesting().getCurrentForTesting();
|
(StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
|
||||||
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
|
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
|
||||||
if (kvs instanceof StoreFileScanner) {
|
if (kvs instanceof StoreFileScanner) {
|
||||||
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
|
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
|
||||||
|
@ -134,4 +138,125 @@ public class TestSwitchToStreamRead {
|
||||||
assertFalse(sf.isReferencedInReads());
|
assertFalse(sf.isReferencedInReads());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final class MatchLastRowKeyFilter extends FilterBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean filterRowKey(Cell cell) throws IOException {
|
||||||
|
return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testFilter(Filter filter) throws IOException {
|
||||||
|
try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
|
||||||
|
StoreScanner storeScanner =
|
||||||
|
(StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
|
||||||
|
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
|
||||||
|
if (kvs instanceof StoreFileScanner) {
|
||||||
|
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
|
||||||
|
// starting from pread so we use shared reader here.
|
||||||
|
assertTrue(sfScanner.getReader().shared);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<Cell> cells = new ArrayList<>();
|
||||||
|
// should return before finishing the scan as we want to switch from pread to stream
|
||||||
|
assertTrue(scanner.next(cells,
|
||||||
|
ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
|
||||||
|
assertTrue(cells.isEmpty());
|
||||||
|
scanner.shipped();
|
||||||
|
|
||||||
|
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
|
||||||
|
if (kvs instanceof StoreFileScanner) {
|
||||||
|
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
|
||||||
|
// we should have convert to use stream read now.
|
||||||
|
assertFalse(sfScanner.getReader().shared);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertFalse(scanner.next(cells,
|
||||||
|
ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
|
||||||
|
Result result = Result.create(cells);
|
||||||
|
assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
|
||||||
|
cells.clear();
|
||||||
|
scanner.shipped();
|
||||||
|
}
|
||||||
|
// make sure all scanners are closed.
|
||||||
|
for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
|
||||||
|
assertFalse(sf.isReferencedInReads());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
|
||||||
|
// until the row key is changed. And there we can only use NoLimitScannerContext so we can not
|
||||||
|
// make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
|
||||||
|
// an infinite loop. Need to dig more, the code are way too complicated...
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void testFilterRowKey() throws IOException {
|
||||||
|
testFilter(new MatchLastRowKeyFilter());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class MatchLastRowCellNextColFilter extends FilterBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnCode filterCell(Cell c) throws IOException {
|
||||||
|
if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
|
||||||
|
return ReturnCode.INCLUDE;
|
||||||
|
} else {
|
||||||
|
return ReturnCode.NEXT_COL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterCellNextCol() throws IOException {
|
||||||
|
testFilter(new MatchLastRowCellNextColFilter());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class MatchLastRowCellNextRowFilter extends FilterBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnCode filterCell(Cell c) throws IOException {
|
||||||
|
if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
|
||||||
|
return ReturnCode.INCLUDE;
|
||||||
|
} else {
|
||||||
|
return ReturnCode.NEXT_ROW;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterCellNextRow() throws IOException {
|
||||||
|
testFilter(new MatchLastRowCellNextRowFilter());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class MatchLastRowFilterRowFilter extends FilterBase {
|
||||||
|
|
||||||
|
private boolean exclude;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void filterRowCells(List<Cell> kvs) throws IOException {
|
||||||
|
Cell c = kvs.get(0);
|
||||||
|
exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() throws IOException {
|
||||||
|
exclude = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean filterRow() throws IOException {
|
||||||
|
return exclude;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasFilterRow() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterRow() throws IOException {
|
||||||
|
testFilter(new MatchLastRowFilterRowFilter());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue