HBASE-15576 Scanning cursor to prevent blocking long time on ResultScanner.next()

This commit is contained in:
Phil Yang 2017-05-25 15:18:58 +08:00
parent 80e15aac21
commit 2f1923a823
16 changed files with 2344 additions and 241 deletions

View File

@ -499,6 +499,21 @@ public abstract class ClientScanner extends AbstractClientScanner {
break; break;
} }
} }
if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
// Use cursor row key from server
cache.add(Result.createCursorResult(callable.getCursor()));
break;
}
if (values.length > 0) {
// It is size limit exceed and we need return the last Result's row.
// When user setBatch and the scanner is reopened, the server may return Results that
// user has seen and the last Result can not be seen because the number is not enough.
// So the row keys of results may not be same, we must use the last one.
cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
break;
}
}
if (countdown <= 0) { if (countdown <= 0) {
// we have enough result. // we have enough result.
closeScannerIfExhausted(regionExhausted); closeScannerIfExhausted(regionExhausted);

View File

@ -27,4 +27,8 @@ public class ClientUtil {
public static boolean areScanStartRowAndStopRowEqual(byte[] startRow, byte[] stopRow) { public static boolean areScanStartRowAndStopRowEqual(byte[] startRow, byte[] stopRow) {
return startRow != null && startRow.length > 0 && Bytes.equals(startRow, stopRow); return startRow != null && startRow.length > 0 && Bytes.equals(startRow, stopRow);
} }
public static Cursor createCursor(byte[] row) {
return new Cursor(row);
}
} }

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Scan cursor to tell client where server is scanning
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Result#isCursor()}
* {@link Result#getCursor()}
*/
@InterfaceAudience.Public
public class Cursor {
private final byte[] row;
Cursor(byte[] row) {
this.row = row;
}
public byte[] getRow() {
return row;
}
}

View File

@ -108,6 +108,8 @@ public class Result implements CellScannable, CellScanner {
private final boolean readonly; private final boolean readonly;
private Cursor cursor = null;
/** /**
* Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}.
* Use this to represent no results if {@code null} won't do or in old 'mapred' as opposed * Use this to represent no results if {@code null} won't do or in old 'mapred' as opposed
@ -173,6 +175,15 @@ public class Result implements CellScannable, CellScanner {
return new Result(cells, null, stale, mayHaveMoreCellsInRow); return new Result(cells, null, stale, mayHaveMoreCellsInRow);
} }
public static Result createCursorResult(Cursor cursor) {
return new Result(cursor);
}
private Result(Cursor cursor) {
this.cursor = cursor;
this.readonly = false;
}
/** Private ctor. Use {@link #create(Cell[])}. */ /** Private ctor. Use {@link #create(Cell[])}. */
private Result(Cell[] cells, Boolean exists, boolean stale, boolean mayHaveMoreCellsInRow) { private Result(Cell[] cells, Boolean exists, boolean stale, boolean mayHaveMoreCellsInRow) {
this.cells = cells; this.cells = cells;
@ -948,4 +959,38 @@ public class Result implements CellScannable, CellScanner {
throw new UnsupportedOperationException("Attempting to modify readonly EMPTY_RESULT!"); throw new UnsupportedOperationException("Attempting to modify readonly EMPTY_RESULT!");
} }
} }
/**
* Return true if this Result is a cursor to tell users where the server has scanned.
* In this Result the only meaningful method is {@link #getCursor()}.
*
* {@code
* while (r = scanner.next() && r != null) {
* if(r.isCursor()){
* // scanning is not end, it is a cursor, save its row key and close scanner if you want, or
* // just continue the loop to call next().
* } else {
* // just like before
* }
* }
* // scanning is end
*
* }
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Cursor}
* {@link #getCursor()}
*/
public boolean isCursor() {
return cursor != null ;
}
/**
* Return the cursor if this Result is a cursor result.
* {@link Scan#setNeedCursorResult(boolean)}
* {@link Cursor}
* {@link #isCursor()}
*/
public Cursor getCursor(){
return cursor;
}
} }

View File

@ -185,6 +185,9 @@ public class Scan extends Query {
* Control whether to use pread at server side. * Control whether to use pread at server side.
*/ */
private ReadType readType = ReadType.DEFAULT; private ReadType readType = ReadType.DEFAULT;
private boolean needCursorResult = false;
/** /**
* Create a Scan operation across all rows. * Create a Scan operation across all rows.
*/ */
@ -272,6 +275,7 @@ public class Scan extends Query {
} }
this.mvccReadPoint = scan.getMvccReadPoint(); this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit(); this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
} }
/** /**
@ -1170,4 +1174,43 @@ public class Scan extends Query {
Scan resetMvccReadPoint() { Scan resetMvccReadPoint() {
return setMvccReadPoint(-1L); return setMvccReadPoint(-1L);
} }
/**
* When the server is slow or we scan a table with many deleted data or we use a sparse filter,
* the server will response heartbeat to prevent timeout. However the scanner will return a Result
* only when client can do it. So if there are many heartbeats, the blocking time on
* ResultScanner#next() may be very long, which is not friendly to online services.
*
* Set this to true then you can get a special Result whose #isCursor() returns true and is not
* contains any real data. It only tells you where the server has scanned. You can call next
* to continue scanning or open a new scanner with this row key as start row whenever you want.
*
* Users can get a cursor when and only when there is a response from the server but we can not
* return a Result to users, for example, this response is a heartbeat or there are partial cells
* but users do not allow partial result.
*
* Now the cursor is in row level which means the special Result will only contains a row key.
* {@link Result#isCursor()}
* {@link Result#getCursor()}
* {@link Cursor}
*/
public Scan setNeedCursorResult(boolean needCursorResult) {
this.needCursorResult = needCursorResult;
return this;
}
public boolean isNeedCursorResult() {
return needCursorResult;
}
/**
* Create a new Scan with a cursor. It only set the position information like start row key.
* The others (like cfs, stop row, limit) should still be filled in by the user.
* {@link Result#isCursor()}
* {@link Result#getCursor()}
* {@link Cursor}
*/
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}
} }

View File

@ -87,6 +87,8 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
*/ */
protected boolean heartbeatMessage = false; protected boolean heartbeatMessage = false;
protected Cursor cursor;
// indicate if it is a remote server call // indicate if it is a remote server call
protected boolean isRegionServerRemote = true; protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0; private long nextCallSeq = 0;
@ -148,7 +150,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
checkIfRegionServerIsRemote(); checkIfRegionServerIsRemote();
instantiated = true; instantiated = true;
} }
cursor = null;
// check how often we retry. // check how often we retry.
if (reload) { if (reload) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
@ -242,7 +244,11 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
response = next(); response = next();
} }
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
setHeartbeatMessage(isHeartBeat);
if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
cursor = ProtobufUtil.toCursor(response.getCursor());
}
Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
if (logScannerActivity) { if (logScannerActivity) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -288,6 +294,10 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
return heartbeatMessage; return heartbeatMessage;
} }
public Cursor getCursor() {
return cursor;
}
private void setHeartbeatMessage(boolean heartbeatMessage) { private void setHeartbeatMessage(boolean heartbeatMessage) {
this.heartbeatMessage = heartbeatMessage; this.heartbeatMessage = heartbeatMessage;
} }

View File

@ -302,6 +302,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
} }
public Cursor getCursor() {
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
}
private void addCallsForCurrentReplica( private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientUtil; import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Cursor;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -185,6 +186,7 @@ import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapreduce.tools.CLI;
/** /**
* Protobufs utility. * Protobufs utility.
@ -1099,6 +1101,9 @@ public final class ProtobufUtil {
if (scan.getReadType() != Scan.ReadType.DEFAULT) { if (scan.getReadType() != Scan.ReadType.DEFAULT) {
scanBuilder.setReadType(toReadType(scan.getReadType())); scanBuilder.setReadType(toReadType(scan.getReadType()));
} }
if (scan.isNeedCursorResult()) {
scanBuilder.setNeedCursorResult(true);
}
return scanBuilder.build(); return scanBuilder.build();
} }
@ -1207,9 +1212,28 @@ public final class ProtobufUtil {
} else if (proto.hasReadType()) { } else if (proto.hasReadType()) {
scan.setReadType(toReadType(proto.getReadType())); scan.setReadType(toReadType(proto.getReadType()));
} }
if (proto.getNeedCursorResult()) {
scan.setNeedCursorResult(true);
}
return scan; return scan;
} }
public static ClientProtos.Cursor toCursor(Cursor cursor) {
ClientProtos.Cursor.Builder builder = ClientProtos.Cursor.newBuilder();
ClientProtos.Cursor.newBuilder().setRow(ByteString.copyFrom(cursor.getRow()));
return builder.build();
}
public static ClientProtos.Cursor toCursor(Cell cell) {
return ClientProtos.Cursor.newBuilder()
.setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
.build();
}
public static Cursor toCursor(ClientProtos.Cursor cursor) {
return ClientUtil.createCursor(cursor.getRow().toByteArray());
}
/** /**
* Create a protocol buffer Get based on a client Get. * Create a protocol buffer Get based on a client Get.
* *

View File

@ -259,13 +259,13 @@ message Scan {
optional uint64 mvcc_read_point = 20 [default = 0]; optional uint64 mvcc_read_point = 20 [default = 0];
optional bool include_start_row = 21 [default = true]; optional bool include_start_row = 21 [default = true];
optional bool include_stop_row = 22 [default = false]; optional bool include_stop_row = 22 [default = false];
enum ReadType { enum ReadType {
DEFAULT = 0; DEFAULT = 0;
STREAM = 1; STREAM = 1;
PREAD = 2; PREAD = 2;
} }
optional ReadType readType = 23 [default = DEFAULT]; optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
} }
/** /**
@ -294,6 +294,14 @@ message ScanRequest {
optional uint32 limit_of_rows = 11 [default = 0]; optional uint32 limit_of_rows = 11 [default = 0];
} }
/**
* Scan cursor to tell client where we are scanning.
*
*/
message Cursor {
optional bytes row = 1;
}
/** /**
* The scan response. If there are no more results, more_results will * The scan response. If there are no more results, more_results will
* be false. If it is not specified, it means there are more. * be false. If it is not specified, it means there are more.
@ -346,6 +354,10 @@ message ScanResponse {
// make use of this mvcc_read_point when restarting a scanner to get a consistent view // make use of this mvcc_read_point when restarting a scanner to get a consistent view
// of a row. // of a row.
optional uint64 mvcc_read_point = 11 [default = 0]; optional uint64 mvcc_read_point = 11 [default = 0];
// If the Scan need cursor, return the row key we are scanning in heartbeat message.
// If the Scan doesn't need a cursor, don't set this field to reduce network IO.
optional Cursor cursor = 12;
} }
/** /**

View File

@ -259,13 +259,13 @@ message Scan {
optional uint64 mvcc_read_point = 20 [default = 0]; optional uint64 mvcc_read_point = 20 [default = 0];
optional bool include_start_row = 21 [default = true]; optional bool include_start_row = 21 [default = true];
optional bool include_stop_row = 22 [default = false]; optional bool include_stop_row = 22 [default = false];
enum ReadType { enum ReadType {
DEFAULT = 0; DEFAULT = 0;
STREAM = 1; STREAM = 1;
PREAD = 2; PREAD = 2;
} }
optional ReadType readType = 23 [default = DEFAULT]; optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
} }
/** /**
@ -294,6 +294,14 @@ message ScanRequest {
optional uint32 limit_of_rows = 11 [default = 0]; optional uint32 limit_of_rows = 11 [default = 0];
} }
/**
* Scan cursor to tell client where we are scanning.
*
*/
message Cursor {
optional bytes row = 1;
}
/** /**
* The scan response. If there are no more results, more_results will * The scan response. If there are no more results, more_results will
* be false. If it is not specified, it means there are more. * be false. If it is not specified, it means there are more.
@ -346,6 +354,10 @@ message ScanResponse {
// make use of this mvcc_read_point when restarting a scanner to get a consistent view // make use of this mvcc_read_point when restarting a scanner to get a consistent view
// of a row. // of a row.
optional uint64 mvcc_read_point = 11 [default = 0]; optional uint64 mvcc_read_point = 11 [default = 0];
// If the Scan need cursor, return the row key we are scanning in heartbeat message.
// If the Scan doesn't need a cursor, don't set this field to reduce network IO.
optional Cursor cursor = 12;
} }
/** /**

View File

@ -380,14 +380,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final RpcCallback closeCallBack; private final RpcCallback closeCallBack;
private final RpcCallback shippedCallback; private final RpcCallback shippedCallback;
private byte[] rowOfLastPartialResult; private byte[] rowOfLastPartialResult;
private boolean needCursor;
public RegionScannerHolder(String scannerName, RegionScanner s, Region r, public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
RpcCallback closeCallBack, RpcCallback shippedCallback) { RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) {
this.scannerName = scannerName; this.scannerName = scannerName;
this.s = s; this.s = s;
this.r = r; this.r = r;
this.closeCallBack = closeCallBack; this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback; this.shippedCallback = shippedCallback;
this.needCursor = needCursor;
} }
public long getNextCallSeq() { public long getNextCallSeq() {
@ -1295,8 +1297,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return lastBlock; return lastBlock;
} }
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r,
throws LeaseStillHeldException { boolean needCursor) throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName)); new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
@ -1307,7 +1309,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
closeCallback = new RegionScannerCloseCallBack(s); closeCallback = new RegionScannerCloseCallBack(s);
} }
RegionScannerHolder rsh = RegionScannerHolder rsh =
new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback); new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh; return rsh;
@ -2857,7 +2859,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.setMvccReadPoint(scanner.getMvccReadPoint()); builder.setMvccReadPoint(scanner.getMvccReadPoint());
builder.setTtl(scannerLeaseTimeoutPeriod); builder.setTtl(scannerLeaseTimeoutPeriod);
String scannerName = String.valueOf(scannerId); String scannerName = String.valueOf(scannerId);
return addScanner(scannerName, scanner, region); return addScanner(scannerName, scanner, region, scan.isNeedCursorResult());
} }
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@ -3054,6 +3056,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (moreRows) { if (moreRows) {
// Heartbeat messages occur when the time limit has been reached. // Heartbeat messages occur when the time limit has been reached.
builder.setHeartbeatMessage(timeLimitReached); builder.setHeartbeatMessage(timeLimitReached);
if (timeLimitReached && rsh.needCursor) {
Cell readingCell = scannerContext.getPeekedCellInHeartbeat();
if (readingCell != null ) {
builder.setCursor(ProtobufUtil.toCursor(readingCell));
}
}
} }
break; break;
} }

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -94,6 +97,8 @@ public class ScannerContext {
boolean keepProgress; boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false; private static boolean DEFAULT_KEEP_PROGRESS = false;
private Cell peekedCellInHeartbeat = null;
/** /**
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
*/ */
@ -328,6 +333,14 @@ public class ScannerContext {
|| checkTimeLimit(checkerScope); || checkTimeLimit(checkerScope);
} }
public Cell getPeekedCellInHeartbeat() {
return peekedCellInHeartbeat;
}
public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) {
this.peekedCellInHeartbeat = peekedCellInHeartbeat;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -585,6 +585,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress(); scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
scannerContext.setPeekedCellInHeartbeat(prevCell);
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
} }
} }

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestScannerCursor {
private static final Log LOG =
LogFactory.getLog(TestScannerCursor.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null;
/**
* Table configuration
*/
private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor");
private static int NUM_ROWS = 5;
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
private static int NUM_FAMILIES = 2;
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
private static int NUM_QUALIFIERS = 2;
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
private static int VALUE_SIZE = 10;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
private static final int TIMEOUT = 4000;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
// Check the timeout condition after every cell
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
return ht;
}
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
Put put;
ArrayList<Put> puts = new ArrayList<>();
for (int row = 0; row < rows.length; row++) {
put = new Put(rows[row]);
for (int fam = 0; fam < families.length; fam++) {
for (int qual = 0; qual < qualifiers.length; qual++) {
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
put.add(kv);
}
}
puts.add(put);
}
return puts;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
public static class SparseFilter extends FilterBase {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
Threads.sleep(TIMEOUT / 2 + 100);
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
: ReturnCode.SKIP;
}
public static Filter parseFrom(final byte[] pbBytes) {
return new SparseFilter();
}
}
@Test
public void testHeartbeatWithSparseFilter() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setNeedCursorResult(true);
scan.setAllowPartialResults(true);
scan.setFilter(new SparseFilter());
try(ResultScanner scanner = TABLE.getScanner(scan)) {
int num = 0;
Result r;
while ((r = scanner.next()) != null) {
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
Assert.assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
} else {
Assert.assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
}
num++;
}
}
}
@Test
public void testSizeLimit() throws IOException {
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setCaching(Integer.MAX_VALUE);
scan.setNeedCursorResult(true);
try (ResultScanner scanner = TABLE.getScanner(scan)) {
int num = 0;
Result r;
while ((r = scanner.next()) != null) {
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS)-1) {
Assert.assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
} else {
Assert.assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
}
num++;
}
}
}
}