HBASE-12761 On region jump ClientScanners should get next row start key instead of a skip.
Signed-off-by: stack <stack@apache.org> Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
This commit is contained in:
parent
2ff16e532b
commit
26749c347d
|
@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
|
||||
|
||||
/**
|
||||
* Implements the scanner interface for the HBase client.
|
||||
* If there are multiple regions in a table, this scanner will iterate
|
||||
|
@ -92,7 +94,8 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
*/
|
||||
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
|
||||
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException {
|
||||
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
|
||||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Scan table=" + tableName
|
||||
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
|
||||
|
@ -229,7 +232,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// Close the previous scanner if it's open
|
||||
if (this.callable != null) {
|
||||
this.callable.setClose();
|
||||
call(scan, callable, caller, scannerTimeout);
|
||||
call(callable, caller, scannerTimeout);
|
||||
this.callable = null;
|
||||
}
|
||||
|
||||
|
@ -266,7 +269,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
callable = getScannerCallable(localStartKey, nbRows);
|
||||
// Open a scanner on the region server starting at the
|
||||
// beginning of the region
|
||||
call(scan, callable, caller, scannerTimeout);
|
||||
call(callable, caller, scannerTimeout);
|
||||
this.currentRegion = callable.getHRegionInfo();
|
||||
if (this.scanMetrics != null) {
|
||||
this.scanMetrics.countOfRegions.incrementAndGet();
|
||||
|
@ -283,7 +286,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
return callable.isAnyRPCcancelled();
|
||||
}
|
||||
|
||||
static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
|
||||
static Result[] call(ScannerCallableWithReplicas callable,
|
||||
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
if (Thread.interrupted()) {
|
||||
|
@ -310,12 +313,12 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
|
||||
/**
|
||||
* Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
|
||||
* application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
|
||||
* framework because it doesn't support multi-instances of the same metrics on the same machine;
|
||||
* for scan/map reduce scenarios, we will have multiple scans running at the same time.
|
||||
* application or TableInputFormat.Later, we could push it to other systems. We don't use
|
||||
* metrics framework because it doesn't support multi-instances of the same metrics on the same
|
||||
* machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
|
||||
*
|
||||
* By default, scan metrics are disabled; if the application wants to collect them, this behavior
|
||||
* can be turned on by calling calling:
|
||||
* By default, scan metrics are disabled; if the application wants to collect them, this
|
||||
* behavior can be turned on by calling calling:
|
||||
*
|
||||
* scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
|
||||
*/
|
||||
|
@ -343,39 +346,13 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
callable.setCaching(this.caching);
|
||||
// This flag is set when we want to skip the result returned. We do
|
||||
// this when we reset scanner because it split under us.
|
||||
boolean skipFirst = false;
|
||||
boolean retryAfterOutOfOrderException = true;
|
||||
do {
|
||||
try {
|
||||
if (skipFirst) {
|
||||
// Skip only the first row (which was the last row of the last
|
||||
// already-processed batch).
|
||||
callable.setCaching(1);
|
||||
values = call(scan, callable, caller, scannerTimeout);
|
||||
// When the replica switch happens, we need to do certain operations
|
||||
// again. The scannercallable will openScanner with the right startkey
|
||||
// but we need to pick up from there. Bypass the rest of the loop
|
||||
// and let the catch-up happen in the beginning of the loop as it
|
||||
// happens for the cases where we see exceptions. Since only openScanner
|
||||
// would have happened, values would be null
|
||||
if (values == null && callable.switchedToADifferentReplica()) {
|
||||
if (this.lastResult != null) { //only skip if there was something read earlier
|
||||
skipFirst = true;
|
||||
}
|
||||
this.currentRegion = callable.getHRegionInfo();
|
||||
continue;
|
||||
}
|
||||
callable.setCaching(this.caching);
|
||||
skipFirst = false;
|
||||
}
|
||||
// Server returns a null values if scanning is to stop. Else,
|
||||
// returns an empty array if scanning is to go on and we've just
|
||||
// exhausted current region.
|
||||
values = call(scan, callable, caller, scannerTimeout);
|
||||
if (skipFirst && values != null && values.length == 1) {
|
||||
skipFirst = false; // Already skipped, unset it before scanning again
|
||||
values = call(scan, callable, caller, scannerTimeout);
|
||||
}
|
||||
values = call(callable, caller, scannerTimeout);
|
||||
// When the replica switch happens, we need to do certain operations
|
||||
// again. The callable will openScanner with the right startkey
|
||||
// but we need to pick up from there. Bypass the rest of the loop
|
||||
|
@ -383,9 +360,6 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// happens for the cases where we see exceptions. Since only openScanner
|
||||
// would have happened, values would be null
|
||||
if (values == null && callable.switchedToADifferentReplica()) {
|
||||
if (this.lastResult != null) { //only skip if there was something read earlier
|
||||
skipFirst = true;
|
||||
}
|
||||
this.currentRegion = callable.getHRegionInfo();
|
||||
continue;
|
||||
}
|
||||
|
@ -428,11 +402,11 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// scanner starts at the correct row. Otherwise we may see previously
|
||||
// returned rows again.
|
||||
// (ScannerCallable by now has "relocated" the correct region)
|
||||
this.scan.setStartRow(this.lastResult.getRow());
|
||||
|
||||
// Skip first row returned. We already let it out on previous
|
||||
// invocation.
|
||||
skipFirst = true;
|
||||
if(scan.isReversed()){
|
||||
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
|
||||
}else {
|
||||
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
|
||||
}
|
||||
}
|
||||
if (e instanceof OutOfOrderScannerNextException) {
|
||||
if (retryAfterOutOfOrderException) {
|
||||
|
@ -452,7 +426,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
continue;
|
||||
}
|
||||
long currentTime = System.currentTimeMillis();
|
||||
if (this.scanMetrics != null ) {
|
||||
if (this.scanMetrics != null) {
|
||||
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
|
||||
}
|
||||
lastNext = currentTime;
|
||||
|
@ -487,7 +461,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
if (callable != null) {
|
||||
callable.setClose();
|
||||
try {
|
||||
call(scan, callable, caller, scannerTimeout);
|
||||
call(callable, caller, scannerTimeout);
|
||||
} catch (UnknownScannerException e) {
|
||||
// We used to catch this error, interpret, and rethrow. However, we
|
||||
// have since decided that it's not nice for a scanner's close to
|
||||
|
|
|
@ -57,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner {
|
|||
TableName tableName, ClusterConnection connection,
|
||||
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
|
||||
ExecutorService pool, int primaryOperationTimeout) throws IOException {
|
||||
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
|
||||
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
|
||||
primaryOperationTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -166,7 +167,7 @@ public class ReversedClientScanner extends ClientScanner {
|
|||
* @param row
|
||||
* @return a new byte array which is the closest front row of the specified one
|
||||
*/
|
||||
protected byte[] createClosestRowBefore(byte[] row) {
|
||||
protected static byte[] createClosestRowBefore(byte[] row) {
|
||||
if (row == null) {
|
||||
throw new IllegalArgumentException("The passed row is empty");
|
||||
}
|
||||
|
|
|
@ -39,9 +39,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
|
||||
|
||||
/**
|
||||
* This class has the logic for handling scanners for regions with and without replicas.
|
||||
* 1. A scan is attempted on the default (primary) region
|
||||
|
@ -272,8 +276,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
continue; //this was already scheduled earlier
|
||||
}
|
||||
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
|
||||
|
||||
if (this.lastResult != null) {
|
||||
s.getScan().setStartRow(this.lastResult.getRow());
|
||||
if(s.getScan().isReversed()){
|
||||
s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
|
||||
}else {
|
||||
s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
|
||||
}
|
||||
}
|
||||
outstandingCallables.add(s);
|
||||
RetryingRPC retryingOnReplica = new RetryingRPC(s);
|
||||
|
|
Loading…
Reference in New Issue