From 26749c347d5ca34844743f16f57a36d99646a81f Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Sun, 28 Dec 2014 13:53:03 +0100 Subject: [PATCH] HBASE-12761 On region jump ClientScanners should get next row start key instead of a skip. Signed-off-by: stack Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java --- .../hadoop/hbase/client/ClientScanner.java | 66 ++++++------------- .../hbase/client/ReversedClientScanner.java | 5 +- .../client/ScannerCallableWithReplicas.java | 11 +++- 3 files changed, 33 insertions(+), 49 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 2ac5652c4f0..b4e515f520f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore; + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -92,7 +94,8 @@ public class ClientScanner extends AbstractClientScanner { */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -229,7 +232,7 @@ public class ClientScanner extends AbstractClientScanner { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); - call(scan, callable, caller, scannerTimeout); + call(callable, caller, scannerTimeout); this.callable = null; } @@ -266,7 +269,7 @@ public class ClientScanner extends AbstractClientScanner { callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region - call(scan, callable, caller, scannerTimeout); + call(callable, caller, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -283,7 +286,7 @@ public class ClientScanner extends AbstractClientScanner { return callable.isAnyRPCcancelled(); } - static Result[] call(Scan scan, ScannerCallableWithReplicas callable, + static Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller caller, int scannerTimeout) throws IOException, RuntimeException { if (Thread.interrupted()) { @@ -310,12 +313,12 @@ public class ClientScanner extends AbstractClientScanner { /** * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the - * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics - * framework because it doesn't support multi-instances of the same metrics on the same machine; - * for scan/map reduce scenarios, we will have multiple scans running at the same time. + * application or TableInputFormat.Later, we could push it to other systems. We don't use + * metrics framework because it doesn't support multi-instances of the same metrics on the same + * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time. * - * By default, scan metrics are disabled; if the application wants to collect them, this behavior - * can be turned on by calling calling: + * By default, scan metrics are disabled; if the application wants to collect them, this + * behavior can be turned on by calling calling: * * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)) */ @@ -343,39 +346,13 @@ public class ClientScanner extends AbstractClientScanner { callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. - boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do { try { - if (skipFirst) { - // Skip only the first row (which was the last row of the last - // already-processed batch). - callable.setCaching(1); - values = call(scan, callable, caller, scannerTimeout); - // When the replica switch happens, we need to do certain operations - // again. The scannercallable will openScanner with the right startkey - // but we need to pick up from there. Bypass the rest of the loop - // and let the catch-up happen in the beginning of the loop as it - // happens for the cases where we see exceptions. Since only openScanner - // would have happened, values would be null - if (values == null && callable.switchedToADifferentReplica()) { - if (this.lastResult != null) { //only skip if there was something read earlier - skipFirst = true; - } - this.currentRegion = callable.getHRegionInfo(); - continue; - } - callable.setCaching(this.caching); - skipFirst = false; - } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. - values = call(scan, callable, caller, scannerTimeout); - if (skipFirst && values != null && values.length == 1) { - skipFirst = false; // Already skipped, unset it before scanning again - values = call(scan, callable, caller, scannerTimeout); - } + values = call(callable, caller, scannerTimeout); // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop @@ -383,9 +360,6 @@ public class ClientScanner extends AbstractClientScanner { // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { - if (this.lastResult != null) { //only skip if there was something read earlier - skipFirst = true; - } this.currentRegion = callable.getHRegionInfo(); continue; } @@ -428,11 +402,11 @@ public class ClientScanner extends AbstractClientScanner { // scanner starts at the correct row. Otherwise we may see previously // returned rows again. // (ScannerCallable by now has "relocated" the correct region) - this.scan.setStartRow(this.lastResult.getRow()); - - // Skip first row returned. We already let it out on previous - // invocation. - skipFirst = true; + if(scan.isReversed()){ + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + }else { + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + } } if (e instanceof OutOfOrderScannerNextException) { if (retryAfterOutOfOrderException) { @@ -452,7 +426,7 @@ public class ClientScanner extends AbstractClientScanner { continue; } long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null ) { + if (this.scanMetrics != null) { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext); } lastNext = currentTime; @@ -487,7 +461,7 @@ public class ClientScanner extends AbstractClientScanner { if (callable != null) { callable.setClose(); try { - call(scan, callable, caller, scannerTimeout); + call(callable, caller, scannerTimeout); } catch (UnknownScannerException e) { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index a03858e725b..8681e19a6ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -57,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner { TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); + super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + primaryOperationTimeout); } @Override @@ -166,7 +167,7 @@ public class ReversedClientScanner extends ClientScanner { * @param row * @return a new byte array which is the closest front row of the specified one */ - protected byte[] createClosestRowBefore(byte[] row) { + protected static byte[] createClosestRowBefore(byte[] row) { if (row == null) { throw new IllegalArgumentException("The passed row is empty"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 8b6fc5cd6d0..23a3ff56752 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -39,9 +39,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import com.google.common.annotations.VisibleForTesting; + +import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore; + /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -272,8 +276,13 @@ class ScannerCallableWithReplicas implements RetryingCallable { continue; //this was already scheduled earlier } ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); + if (this.lastResult != null) { - s.getScan().setStartRow(this.lastResult.getRow()); + if(s.getScan().isReversed()){ + s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); + }else { + s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); + } } outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s);