HBASE-15576 Scanning cursor to prevent blocking long time on ResultScanner.next()

This commit is contained in:
Phil Yang 2017-06-06 15:39:24 +08:00
parent 9cb57ae35e
commit 381c89b5cc
14 changed files with 1340 additions and 103 deletions

View File

@ -505,6 +505,21 @@ public abstract class ClientScanner extends AbstractClientScanner {
break;
}
}
if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
// Use cursor row key from server
cache.add(Result.createCursorResult(callable.getCursor()));
break;
}
if (values.length > 0) {
// It is size limit exceed and we need return the last Result's row.
// When user setBatch and the scanner is reopened, the server may return Results that
// user has seen and the last Result can not be seen because the number is not enough.
// So the row keys of results may not be same, we must use the last one.
cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
break;
}
}
if (countdown <= 0) {
// we have enough result.
closeScannerIfExhausted(regionExhausted);

View File

@ -27,4 +27,8 @@ public class ClientUtil {
public static boolean areScanStartRowAndStopRowEqual(byte[] startRow, byte[] stopRow) {
return startRow != null && startRow.length > 0 && Bytes.equals(startRow, stopRow);
}
public static Cursor createCursor(byte[] row) {
return new Cursor(row);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Scan cursor to tell client where server is scanning
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Result#isCursor()}
* {@link Result#getCursor()}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cursor {
private final byte[] row;
Cursor(byte[] row) {
this.row = row;
}
public byte[] getRow() {
return row;
}
}

View File

@ -107,6 +107,8 @@ public class Result implements CellScannable, CellScanner {
private final boolean readonly;
private Cursor cursor = null;
/**
* Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}.
* Use this to represent no results if <code>null</code> won't do or in old 'mapred' as oppposed to 'mapreduce' package
@ -188,6 +190,15 @@ public class Result implements CellScannable, CellScanner {
return new Result(cells, null, stale, mayHaveMoreCellsInRow);
}
public static Result createCursorResult(Cursor cursor) {
return new Result(cursor);
}
private Result(Cursor cursor) {
this.cursor = cursor;
this.readonly = false;
}
/** Private ctor. Use {@link #create(Cell[])}. */
private Result(Cell[] cells, Boolean exists, boolean stale, boolean mayHaveMoreCellsInRow) {
this.cells = cells;
@ -1030,4 +1041,38 @@ public class Result implements CellScannable, CellScanner {
throw new UnsupportedOperationException("Attempting to modify readonly EMPTY_RESULT!");
}
}
/**
* Return true if this Result is a cursor to tell users where the server has scanned.
* In this Result the only meaningful method is {@link #getCursor()}.
*
* {@code
* while (r = scanner.next() && r != null) {
* if(r.isCursor()){
* // scanning is not end, it is a cursor, save its row key and close scanner if you want, or
* // just continue the loop to call next().
* } else {
* // just like before
* }
* }
* // scanning is end
*
* }
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Cursor}
* {@link #getCursor()}
*/
public boolean isCursor() {
return cursor != null ;
}
/**
* Return the cursor if this Result is a cursor result.
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Cursor}
* {@link #isCursor()}
*/
public Cursor getCursor(){
return cursor;
}
}

View File

@ -187,6 +187,8 @@ public class Scan extends Query {
*/
private ReadType readType = ReadType.DEFAULT;
private boolean needCursorResult = false;
/**
* Create a Scan operation across all rows.
*/
@ -275,6 +277,7 @@ public class Scan extends Query {
}
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
}
/**
@ -1209,4 +1212,43 @@ public class Scan extends Query {
Scan resetMvccReadPoint() {
return setMvccReadPoint(-1L);
}
/**
* When the server is slow or we scan a table with many deleted data or we use a sparse filter,
* the server will response heartbeat to prevent timeout. However the scanner will return a Result
* only when client can do it. So if there are many heartbeats, the blocking time on
* ResultScanner#next() may be very long, which is not friendly to online services.
*
* Set this to true then you can get a special Result whose #isCursor() returns true and is not
* contains any real data. It only tells you where the server has scanned. You can call next
* to continue scanning or open a new scanner with this row key as start row whenever you want.
*
* Users can get a cursor when and only when there is a response from the server but we can not
* return a Result to users, for example, this response is a heartbeat or there are partial cells
* but users do not allow partial result.
*
* Now the cursor is in row level which means the special Result will only contains a row key.
* {@link Result#isCursor()}
* {@link Result#getCursor()}
* {@link Cursor}
*/
public Scan setNeedCursorResult(boolean needCursorResult) {
this.needCursorResult = needCursorResult;
return this;
}
public boolean isNeedCursorResult() {
return needCursorResult;
}
/**
* Create a new Scan with a cursor. It only set the position information like start row key.
* The others (like cfs, stop row, limit) should still be filled in by the user.
* {@link Result#isCursor()}
* {@link Result#getCursor()}
* {@link Cursor}
*/
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}
}

View File

@ -100,6 +100,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
}
}
protected Cursor cursor;
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
@ -168,7 +170,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
checkIfRegionServerIsRemote();
instantiated = true;
}
cursor = null;
// check how often we retry.
// HConnectionManager will call instantiateServer with reload==true
// if and only if for retries.
@ -274,7 +276,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
response = next();
}
long timestamp = System.currentTimeMillis();
setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
setHeartbeatMessage(isHeartBeat);
if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
cursor = ProtobufUtil.toCursor(response.getCursor());
}
Result[] rrs = ResponseConverter.getResults(controller.cellScanner(), response);
if (logScannerActivity) {
long now = System.currentTimeMillis();
@ -320,6 +326,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return heartbeatMessage;
}
public Cursor getCursor() {
return cursor;
}
private void setHeartbeatMessage(boolean heartbeatMessage) {
this.heartbeatMessage = heartbeatMessage;
}

View File

@ -303,6 +303,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
}
public Cursor getCursor() {
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
}
private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Cursor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -1058,6 +1059,9 @@ public final class ProtobufUtil {
if (scan.getReadType() != Scan.ReadType.DEFAULT) {
scanBuilder.setReadType(toReadType(scan.getReadType()));
}
if (scan.isNeedCursorResult()) {
scanBuilder.setNeedCursorResult(true);
}
return scanBuilder.build();
}
@ -1167,9 +1171,28 @@ public final class ProtobufUtil {
} else if (proto.hasReadType()) {
scan.setReadType(toReadType(proto.getReadType()));
}
if (proto.getNeedCursorResult()) {
scan.setNeedCursorResult(true);
}
return scan;
}
public static ClientProtos.Cursor toCursor(Cursor cursor) {
ClientProtos.Cursor.Builder builder = ClientProtos.Cursor.newBuilder();
ClientProtos.Cursor.newBuilder().setRow(ByteString.copyFrom(cursor.getRow()));
return builder.build();
}
public static ClientProtos.Cursor toCursor(Cell cell) {
return ClientProtos.Cursor.newBuilder()
.setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
.build();
}
public static Cursor toCursor(ClientProtos.Cursor cursor) {
return ClientUtil.createCursor(cursor.getRow().toByteArray());
}
/**
* Create a protocol buffer Get based on a client Get.
*

View File

@ -263,13 +263,13 @@ message Scan {
optional uint64 mvcc_read_point = 20 [default = 0];
optional bool include_start_row = 21 [default = true];
optional bool include_stop_row = 22 [default = false];
enum ReadType {
DEFAULT = 0;
STREAM = 1;
PREAD = 2;
}
optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
}
/**
@ -298,6 +298,14 @@ message ScanRequest {
optional uint32 limit_of_rows = 11 [default = 0];
}
/**
* Scan cursor to tell client where we are scanning.
*
*/
message Cursor {
optional bytes row = 1;
}
/**
* The scan response. If there are no more results, more_results will
* be false. If it is not specified, it means there are more.
@ -350,6 +358,10 @@ message ScanResponse {
// make use of this mvcc_read_point when restarting a scanner to get a consistent view
// of a row.
optional uint64 mvcc_read_point = 11 [default = 0];
// If the Scan need cursor, return the row key we are scanning in heartbeat message.
// If the Scan doesn't need a cursor, don't set this field to reduce network IO.
optional Cursor cursor = 12;
}
/**

View File

@ -282,11 +282,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final RegionScanner s;
private final Region r;
private byte[] rowOfLastPartialResult;
private boolean needCursor;
public RegionScannerHolder(String scannerName, RegionScanner s, Region r) {
public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean needCursor) {
this.scannerName = scannerName;
this.s = s;
this.r = r;
this.needCursor = needCursor;
}
public long getNextCallSeq() {
@ -1165,11 +1167,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return lastBlock;
}
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
throws LeaseStillHeldException {
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r,
boolean needCursor) throws LeaseStillHeldException {
regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r);
RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r, needCursor);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh;
@ -2568,7 +2570,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.setMvccReadPoint(scanner.getMvccReadPoint());
builder.setTtl(scannerLeaseTimeoutPeriod);
String scannerName = String.valueOf(scannerId);
return addScanner(scannerName, scanner, region);
return addScanner(scannerName, scanner, region, scan.isNeedCursorResult());
}
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@ -2764,6 +2766,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (moreRows) {
// Heartbeat messages occur when the time limit has been reached.
builder.setHeartbeatMessage(timeLimitReached);
if (timeLimitReached && rsh.needCursor) {
Cell readingCell = scannerContext.getPeekedCellInHeartbeat();
if (readingCell != null) {
builder.setCursor(ProtobufUtil.toCursor(readingCell));
}
}
}
break;
}

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -94,6 +97,8 @@ public class ScannerContext {
boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false;
private Cell peekedCellInHeartbeat = null;
/**
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
*/
@ -319,6 +324,14 @@ public class ScannerContext {
|| checkTimeLimit(checkerScope);
}
public Cell getPeekedCellInHeartbeat() {
return peekedCellInHeartbeat;
}
public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) {
this.peekedCellInHeartbeat = peekedCellInHeartbeat;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -539,6 +539,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
scannerContext.setPeekedCellInHeartbeat(prevCell);
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestScannerCursor {
private static final Log LOG =
LogFactory.getLog(TestScannerCursor.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null;
/**
* Table configuration
*/
private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor");
private static int NUM_ROWS = 5;
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
private static int NUM_FAMILIES = 2;
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
private static int NUM_QUALIFIERS = 2;
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
private static int VALUE_SIZE = 10;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
private static final int TIMEOUT = 4000;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
// Check the timeout condition after every cell
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
return ht;
}
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
Put put;
ArrayList<Put> puts = new ArrayList<>();
for (int row = 0; row < rows.length; row++) {
put = new Put(rows[row]);
for (int fam = 0; fam < families.length; fam++) {
for (int qual = 0; qual < qualifiers.length; qual++) {
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
put.add(kv);
}
}
puts.add(put);
}
return puts;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
public static class SparseFilter extends FilterBase {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
Threads.sleep(TIMEOUT / 2 + 100);
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
: ReturnCode.SKIP;
}
public static Filter parseFrom(final byte[] pbBytes) {
return new SparseFilter();
}
}
@Test
public void testHeartbeatWithSparseFilter() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setNeedCursorResult(true);
scan.setAllowPartialResults(true);
scan.setFilter(new SparseFilter());
try(ResultScanner scanner = TABLE.getScanner(scan)) {
int num = 0;
Result r;
while ((r = scanner.next()) != null) {
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
Assert.assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
} else {
Assert.assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
}
num++;
}
}
}
@Test
public void testSizeLimit() throws IOException {
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setCaching(Integer.MAX_VALUE);
scan.setNeedCursorResult(true);
try (ResultScanner scanner = TABLE.getScanner(scan)) {
int num = 0;
Result r;
while ((r = scanner.next()) != null) {
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS)-1) {
Assert.assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
} else {
Assert.assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
}
num++;
}
}
}
}