diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 52027ffbc3f..9ff36f8686a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -55,17 +52,15 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; - /** * Implements the scanner interface for the HBase client. If there are multiple regions in a table, * this scanner will iterate through them all. */ @InterfaceAudience.Private -public class ClientScanner extends AbstractClientScanner { +public abstract class ClientScanner extends AbstractClientScanner { private static final Log LOG = LogFactory.getLog(ClientScanner.class); - protected Scan scan; + protected final Scan scan; protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. @@ -219,89 +214,71 @@ public class ClientScanner extends AbstractClientScanner { return maxScannerResultSize; } - // returns true if the passed region endKey - protected boolean checkScanStopRow(final byte[] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte[] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } - } - return false; // unlikely. - } - - protected final void closeScanner() throws IOException { + private void closeScanner() throws IOException { if (this.callable != null) { this.callable.setClose(); - call(callable, caller, scannerTimeout); + call(callable, caller, scannerTimeout, false); this.callable = null; } } /** - * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the - * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). - * @param nbRows the caching option of the scan - * @return the results fetched when open scanner, or null which means terminate the scan. + * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal + * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we + * will start next scan from the startKey of the currentRegion. + * @return {@code false} if we have reached the stop row. Otherwise {@code true}. */ - protected Result[] nextScanner(int nbRows) throws IOException { + protected abstract boolean setNewStartKey(); + + /** + * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed + * scan we need to create a ReversedScannerCallable. + */ + protected abstract ScannerCallable createScannerCallable(); + + /** + * Close the previous scanner and create a new ScannerCallable for the next scanner. + *

+ * Marked as protected only because TestClientScanner need to override this method. + * @return false if we should terminate the scan. Otherwise + */ + @VisibleForTesting + protected boolean moveToNextRegion() { // Close the previous scanner if it's open - closeScanner(); - - // Where to start the next scanner - byte[] localStartKey; - - // if we're at end of table, close and return null to stop iterating - if (this.currentRegion != null) { - byte[] endKey = this.currentRegion.getEndKey(); - if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey)) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished " + this.currentRegion); - } - return null; + try { + closeScanner(); + } catch (IOException e) { + // not a big deal continue + if (LOG.isDebugEnabled()) { + LOG.debug("close scanner for " + currentRegion + " failed", e); + } + } + if (currentRegion != null) { + if (!setNewStartKey()) { + return false; } - localStartKey = endKey; - // clear mvcc read point if we are going to switch regions scan.resetMvccReadPoint(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - } else { - localStartKey = this.scan.getStartRow(); } - if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug( - "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) + + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - Result[] rrs = call(callable, caller, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { - // no results for the scan, return null to terminate the scan. - closed = true; - callable = null; - currentRegion = null; - return null; - } - return rrs; - } catch (IOException e) { - closeScanner(); - throw e; + // clear the current region, we will set a new value to it after the first call of the new + // callable. + this.currentRegion = null; + this.callable = + new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, + primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); + this.callable.setCaching(this.caching); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); } + return true; } @VisibleForTesting @@ -310,24 +287,17 @@ public class ClientScanner extends AbstractClientScanner { } private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller caller, - int scannerTimeout) throws IOException { + int scannerTimeout, boolean updateCurrentRegion) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries - return caller.callWithoutRetries(callable, scannerTimeout); - } - - @InterfaceAudience.Private - protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, - pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); - return sr; + Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); + if (currentRegion == null && updateCurrentRegion) { + currentRegion = callable.getHRegionInfo(); + } + return rrs; } /** @@ -374,9 +344,7 @@ public class ClientScanner extends AbstractClientScanner { } private boolean scanExhausted(Result[] values) { - // This means the server tells us the whole scan operation is done. Usually decided by filter or - // limit. - return values == null || callable.moreResultsForScan() == MoreResults.NO; + return callable.moreResultsForScan() == MoreResults.NO; } private boolean regionExhausted(Result[] values) { @@ -384,8 +352,8 @@ public class ClientScanner extends AbstractClientScanner { // old time we always return empty result for a open scanner operation so we add a check here to // keep compatible with the old logic. Should remove the isOpenScanner in the future. // 2. Server tells us that it has no more results for this region. - return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner()) - || callable.moreResultsInRegion() == MoreResults.NO; + return (values.length == 0 && !callable.isHeartbeatMessage()) || + callable.moreResultsInRegion() == MoreResults.NO; } private void closeScannerIfExhausted(boolean exhausted) throws IOException { @@ -393,25 +361,14 @@ public class ClientScanner extends AbstractClientScanner { if (!partialResults.isEmpty()) { // XXX: continue if there are partial results. But in fact server should not set // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" - + " partialResults, this should not happen, retry on the current scanner anyway"); + LOG.warn("Server tells us there is no more results for this region but we still have" + + " partialResults, this should not happen, retry on the current scanner anyway"); } else { closeScanner(); } } } - private Result[] nextScannerWithRetries(int nbRows) throws IOException { - int retriesLeft = getRetries(); - for (;;) { - try { - return nextScanner(nbRows); - } catch (DoNotRetryIOException e) { - handleScanError(e, null, retriesLeft--); - } - } - } - private void handleScanError(DoNotRetryIOException e, MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { // An exception was thrown which makes any partial results that we were collecting @@ -451,27 +408,18 @@ public class ClientScanner extends AbstractClientScanner { // The region has moved. We need to open a brand new scanner at the new location. // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. - // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0) { - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); - } else { - scan.setStartRow(createClosestRowAfter(lastResult.getRow())); - } - } else { - // we need rescan this row because we only loaded partial row before - scan.setStartRow(lastResult.getRow()); - } + // If the lastRow is not partial, then we should start from the next row. As now we can + // exclude the start row, the logic here is the same for both normal scan and reversed scan. + // If lastResult is partial then include it, otherwise exclude it. + scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0); } if (e instanceof OutOfOrderScannerNextException) { - if (retryAfterOutOfOrderException != null) { - if (retryAfterOutOfOrderException.isTrue()) { - retryAfterOutOfOrderException.setValue(false); - } else { - // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? - throw new DoNotRetryIOException( - "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); - } + if (retryAfterOutOfOrderException.isTrue()) { + retryAfterOutOfOrderException.setValue(false); + } else { + // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? + throw new DoNotRetryIOException( + "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); } } // Clear region. @@ -489,18 +437,14 @@ public class ClientScanner extends AbstractClientScanner { if (closed) { return; } - Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { return; } } - // We need to reset it if it's a new callable that was created with a countdown in nextScanner - callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); @@ -508,15 +452,14 @@ public class ClientScanner extends AbstractClientScanner { // make sure that we are not retrying indefinitely. int retriesLeft = getRetries(); for (;;) { + Result[] values; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. // now we will also fetch data when openScanner, so do not make a next call again if values // is already non-null. - if (values == null) { - values = call(callable, caller, scannerTimeout); - } + values = call(callable, caller, scannerTimeout, true); // When the replica switch happens, we need to do certain operations again. // The callable will openScanner with the right startkey but we need to pick up // from there. Bypass the rest of the loop and let the catch-up happen in the beginning @@ -526,19 +469,12 @@ public class ClientScanner extends AbstractClientScanner { // openScanner with the correct startkey and we must pick up from there clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); - // Now we will also fetch data when openScanner so usually we should not get a null - // result, but at some places we still use null to indicate the scan is terminated, so add - // a sanity check here. Should be removed later. - if (values == null) { - continue; - } } retryAfterOutOfOrderException.setValue(true); } catch (DoNotRetryIOException e) { handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); // reopen the scanner - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { break; } continue; @@ -590,8 +526,8 @@ public class ClientScanner extends AbstractClientScanner { // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // unnecesary delays to the caller if (LOG.isTraceEnabled()) { - LOG.trace("Heartbeat message received and cache contains Results." - + " Breaking out of scan loop"); + LOG.trace("Heartbeat message received and cache contains Results." + + " Breaking out of scan loop"); } // we know that the region has not been exhausted yet so just break without calling // closeScannerIfExhausted @@ -618,17 +554,13 @@ public class ClientScanner extends AbstractClientScanner { if (!partialResults.isEmpty()) { // XXX: continue if there are partial results. But in fact server should not set // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" - + " partialResults, this should not happen, retry on the current scanner anyway"); - values = null; // reset values for the next call + LOG.warn("Server tells us there is no more results for this region but we still have" + + " partialResults, this should not happen, retry on the current scanner anyway"); continue; } - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { break; } - } else { - values = null; // reset values for the next call } } } @@ -769,9 +701,9 @@ public class ClientScanner extends AbstractClientScanner { private void addToPartialResults(final Result result) throws IOException { final byte[] row = result.getRow(); if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) { - throw new IOException("Partial result row does not match. All partial results must come " - + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " - + Bytes.toString(row)); + throw new IOException("Partial result row does not match. All partial results must come " + + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " + + Bytes.toString(row)); } partialResultsRow = row; partialResults.add(result); @@ -806,7 +738,7 @@ public class ClientScanner extends AbstractClientScanner { if (callable != null) { callable.setClose(); try { - call(callable, caller, scannerTimeout); + call(callable, caller, scannerTimeout, false); } catch (UnknownScannerException e) { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java new file mode 100644 index 00000000000..e3f9ec3fd10 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; +import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * ClientSimpleScanner implements a sync scanner behaviour. The cache is a simple list. The prefetch + * is invoked only when the application finished processing the entire cache. + */ +@InterfaceAudience.Private +public class ClientSimpleScanner extends ClientScanner { + public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, + int replicaCallTimeoutMicroSecondScan) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, + replicaCallTimeoutMicroSecondScan); + } + + @Override + protected boolean setNewStartKey() { + if (noMoreResultsForScan(scan, currentRegion)) { + return false; + } + scan.withStartRow(currentRegion.getEndKey(), true); + return true; + } + + @Override + protected ScannerCallable createScannerCallable() { + if (!scan.includeStartRow() && !isEmptyStartRow(scan.getStartRow())) { + // we have not implemented locate to next row for sync client yet, so here we change the + // inclusive of start row to true. + scan.withStartRow(createClosestRowAfter(scan.getStartRow()), true); + } + return new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index c10f8939274..60d4217eb1c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -1262,7 +1262,7 @@ class ConnectionManager { Scan s = new Scan(); s.setReversed(true); - s.setStartRow(metaKey); + s.withStartRow(metaKey); s.setOneRowLimit(); if (this.useMetaReplicas) { s.setConsistency(Consistency.TIMELINE); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index e7b41142e39..8f496c96b85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -27,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -209,12 +213,9 @@ public class ConnectionUtils { } /** - * Create the closest row before the specified row - * @deprecated in fact, we do not know the closest row before the given row, the result is only a - * row very close to the current row. Avoid using this method in the future. + * Create a row before the specified row and very close to the specified row. */ - @Deprecated - static byte[] createClosestRowBefore(byte[] row) { + static byte[] createCloseRowBefore(byte[] row) { if (row.length == 0) { return MAX_BYTE_ARRAY; } @@ -228,4 +229,38 @@ public class ConnectionUtils { return nextRow; } } + + static boolean isEmptyStartRow(byte[] row) { + return Bytes.equals(row, EMPTY_START_ROW); + } + + static boolean isEmptyStopRow(byte[] row) { + return Bytes.equals(row, EMPTY_END_ROW); + } + + static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) { + if (isEmptyStopRow(info.getEndKey())) { + return true; + } + if (isEmptyStopRow(scan.getStopRow())) { + return false; + } + int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); + // 1. if our stop row is less than the endKey of the region + // 2. if our stop row is equal to the endKey of the region and we do not include the stop row + // for scan. + return c > 0 || (c == 0 && !scan.includeStopRow()); + } + + static boolean noMoreResultsForReverseScan(Scan scan, HRegionInfo info) { + if (isEmptyStartRow(info.getStartKey())) { + return true; + } + if (isEmptyStopRow(scan.getStopRow())) { + return false; + } + // no need to test the inclusive of the stop row as the start key of a region is included in + // the region. + return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1e3a900c2f0..45ab7decb70 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -799,7 +799,7 @@ public class HTable implements HTableInterface, RegionLocator { this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { - return new ClientScanner(getConfiguration(), scan, getName(), this.connection, + return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index edb66c5a1d7..d67f93647e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,32 +18,25 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import java.io.IOException; import java.util.concurrent.ExecutorService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ExceptionUtil; /** * A reversed client scanner which support backward scanning */ @InterfaceAudience.Private public class ReversedClientScanner extends ClientScanner { - private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. + * Create a new ReversibleClientScanner for the specified table Note that the passed + * {@link Scan}'s start row maybe changed. * @param conf * @param scan * @param tableName @@ -52,111 +45,26 @@ public class ReversedClientScanner extends ClientScanner { * @param primaryOperationTimeout * @throws IOException */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, - ExecutorService pool, int primaryOperationTimeout) throws IOException { + public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); } @Override - protected Result[] nextScanner(int nbRows) throws IOException { - // Close the previous scanner if it's open - closeScanner(); - - // Where to start the next scanner - byte[] localStartKey; - boolean locateTheClosestFrontRow = true; - // if we're at start of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey)) { - close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + this.currentRegion); - } - return null; - } - localStartKey = startKey; - // clear mvcc read point if we are going to switch regions - scan.resetMvccReadPoint(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + this.currentRegion); - } - } else { - localStartKey = this.scan.getStartRow(); - if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { - locateTheClosestFrontRow = false; - } + protected boolean setNewStartKey() { + if (noMoreResultsForReverseScan(scan, currentRegion)) { + return false; } - - if (LOG.isDebugEnabled() && this.currentRegion != null) { - // Only worth logging if NOT first region in scan. - LOG.debug("Advancing internal scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - try { - // In reversed scan, we want to locate the previous region through current - // region's start key. In order to get that previous region, first we - // create a closest row before the start key of current region, then - // locate all the regions from the created closest row to start key of - // current region, thus the last one of located regions should be the - // previous region of current region. The related logic of locating - // regions is implemented in ReversedScannerCallable - byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) - : null; - callable = getScannerCallable(localStartKey, nbRows, locateStartRow); - // Open a scanner on the region server starting at the - // beginning of the region - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { - // no results for the scan, return null to terminate the scan. - return null; - } - return rrs; - } catch (IOException e) { - ExceptionUtil.rethrowIfInterrupt(e); - close(); - throw e; - } - } - - protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, - int nbRows, byte[] locateStartRow) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - locateStartRow, this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = - new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool, - primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(), - caller); - return sr; + scan.withStartRow(currentRegion.getStartKey(), false); + return true; } @Override - // returns true if stopRow >= passed region startKey - protected boolean checkScanStopRow(final byte[] startKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte[] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0, - startKey.length); - if (cmp >= 0) { - // stopRow >= startKey (stopRow is equals to or larger than endKey) - // This is a stop. - return true; - } - } - return false; // unlikely. + protected ReversedScannerCallable createScannerCallable() { + return new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 840af97f90c..cbf2de001cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -18,13 +18,14 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -41,26 +42,18 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public class ReversedScannerCallable extends ScannerCallable { - /** - * The start row for locating regions. In reversed scanner, may locate the - * regions for a range of keys when doing - * {@link ReversedClientScanner#nextScanner(int)} - */ - protected final byte[] locateStartRow; /** * @param connection * @param tableName * @param scan * @param scanMetrics - * @param locateStartRow The start row for locating regions - * @param rpcFactory to create an - * {@link com.google.protobuf.RpcController} to talk to the regionserver + * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the + * regionserver */ public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) { + ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) { super(connection, tableName, scan, scanMetrics, rpcFactory); - this.locateStartRow = locateStartRow; } /** @@ -68,26 +61,13 @@ public class ReversedScannerCallable extends ScannerCallable { * @param tableName * @param scan * @param scanMetrics - * @param locateStartRow The start row for locating regions - * @param rpcFactory to create an - * {@link com.google.protobuf.RpcController} to talk to the regionserver + * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the + * regionserver * @param replicaId the replica id */ public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, int replicaId) { + ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) { super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); - this.locateStartRow = locateStartRow; - } - - /** - * @deprecated use - * {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory )} - */ - @Deprecated - public ReversedScannerCallable(ClusterConnection connection, TableName tableName, - Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { - this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory - .instantiate(connection.getConfiguration())); } /** @@ -100,12 +80,15 @@ public class ReversedScannerCallable extends ScannerCallable { throw new InterruptedIOException(); } if (!instantiated || reload) { - if (locateStartRow == null) { + // we should use range locate if + // 1. we do not want the start row + // 2. the start row is empty which means we need to locate to the last region. + if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { // Just locate the region with the row RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id, getConnection(), tableName, row); this.location = id < rl.size() ? rl.getRegionLocation(id) : null; - if (this.location == null) { + if (location == null || location.getServerName() == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload=" + reload); @@ -113,6 +96,7 @@ public class ReversedScannerCallable extends ScannerCallable { } else { // Need to locate the regions with the range, and the target location is // the last one which is the previous region of last region scanner + byte[] locateStartRow = createCloseRowBefore(getRow()); List locatedRegions = locateRegionsInRange( locateStartRow, row, reload); if (locatedRegions.isEmpty()) { @@ -181,8 +165,8 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { - ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName, - this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id); + ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), + this.getScan(), this.scanMetrics, controllerFactory, id); r.setCaching(this.getCaching()); return r; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 84f1ca92e2d..70be1d14015 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -86,8 +86,10 @@ public class Scan extends Query { private static final String RAW_ATTR = "_raw_"; - private byte [] startRow = HConstants.EMPTY_START_ROW; - private byte [] stopRow = HConstants.EMPTY_END_ROW; + private byte[] startRow = HConstants.EMPTY_START_ROW; + private boolean includeStartRow = true; + private byte[] stopRow = HConstants.EMPTY_END_ROW; + private boolean includeStopRow = false; private int maxVersions = 1; private int batch = -1; @@ -190,7 +192,11 @@ public class Scan extends Query { */ public Scan() {} - public Scan(byte [] startRow, Filter filter) { + /** + * @deprecated use {@code new Scan().withStartRow(startRow).setFilter(filter)} instead. + */ + @Deprecated + public Scan(byte[] startRow, Filter filter) { this(startRow); this.filter = filter; } @@ -198,24 +204,26 @@ public class Scan extends Query { /** * Create a Scan operation starting at the specified row. *

- * If the specified row does not exist, the Scanner will start from the - * next closest row after the specified row. + * If the specified row does not exist, the Scanner will start from the next closest row after the + * specified row. * @param startRow row to start scanner at or after + * @deprecated use {@code new Scan().withStartRow(startRow)} instead. */ - public Scan(byte [] startRow) { - this.startRow = startRow; + @Deprecated + public Scan(byte[] startRow) { + setStartRow(startRow); } /** * Create a Scan operation for the range of rows specified. * @param startRow row to start scanner at or after (inclusive) * @param stopRow row to stop scanner before (exclusive) + * @deprecated use {@code new Scan().withStartRow(startRow).withStopRow(stopRow)} instead. */ - public Scan(byte [] startRow, byte [] stopRow) { - this.startRow = startRow; - this.stopRow = stopRow; - //if the startRow and stopRow both are empty, it is not a Get - this.getScan = isStartRowAndEqualsStopRow(); + @Deprecated + public Scan(byte[] startRow, byte[] stopRow) { + setStartRow(startRow); + setStopRow(stopRow); } /** @@ -226,7 +234,9 @@ public class Scan extends Query { */ public Scan(Scan scan) throws IOException { startRow = scan.getStartRow(); + includeStartRow = scan.includeStartRow(); stopRow = scan.getStopRow(); + includeStopRow = scan.includeStopRow(); maxVersions = scan.getMaxVersions(); batch = scan.getBatch(); storeLimit = scan.getMaxResultsPerColumnFamily(); @@ -273,7 +283,9 @@ public class Scan extends Query { */ public Scan(Get get) { this.startRow = get.getRow(); + this.includeStartRow = true; this.stopRow = get.getRow(); + this.includeStopRow = true; this.filter = get.getFilter(); this.cacheBlocks = get.getCacheBlocks(); this.maxVersions = get.getMaxVersions(); @@ -295,13 +307,13 @@ public class Scan extends Query { } public boolean isGetScan() { - return this.getScan || isStartRowAndEqualsStopRow(); + return includeStartRow && includeStopRow && areStartRowAndStopRowEqual(startRow, stopRow); } - private boolean isStartRowAndEqualsStopRow() { - return this.startRow != null && this.startRow.length > 0 && - Bytes.equals(this.startRow, this.stopRow); + private static boolean areStartRowAndStopRowEqual(byte[] startRow, byte[] stopRow) { + return startRow != null && startRow.length > 0 && Bytes.equals(startRow, stopRow); } + /** * Get all columns from the specified family. *

@@ -378,44 +390,124 @@ public class Scan extends Query { return (Scan) super.setColumnFamilyTimeRange(cf, minStamp, maxStamp); } + /** * Set the start row of the scan. - * @param startRow row to start scan on (inclusive) - * Note: In order to make startRow exclusive add a trailing 0 byte + *

+ * If the specified row does not exist, the Scanner will start from the next closest row after the + * specified row. + * @param startRow row to start scanner at or after * @return this - * @throws IllegalArgumentException if startRow does not meet criteria - * for a row key (when length exceeds {@link HConstants#MAX_ROW_LENGTH}) + * @throws IllegalArgumentException if startRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) + * @deprecated use {@link #withStartRow(byte[])} instead. This method may change the inclusive of + * the stop row to keep compatible with the old behavior. */ - public Scan setStartRow(byte [] startRow) { - if (Bytes.len(startRow) > HConstants.MAX_ROW_LENGTH) { - throw new IllegalArgumentException( - "startRow's length must be less than or equal to " + - HConstants.MAX_ROW_LENGTH + " to meet the criteria" + - " for a row key."); + @Deprecated + public Scan setStartRow(byte[] startRow) { + withStartRow(startRow); + if (areStartRowAndStopRowEqual(startRow, stopRow)) { + // for keeping the old behavior that a scan with the same start and stop row is a get scan. + this.includeStopRow = true; } - this.startRow = startRow; return this; } /** - * Set the stop row. - * @param stopRow row to end at (exclusive) - *

Note: In order to make stopRow inclusive add a trailing 0 byte

- *

Note: When doing a filter for a rowKey Prefix - * use {@link #setRowPrefixFilter(byte[])}. - * The 'trailing 0' will not yield the desired result.

+ * Set the start row of the scan. + *

+ * If the specified row does not exist, the Scanner will start from the next closest row after the + * specified row. + * @param startRow row to start scanner at or after * @return this - * @throws IllegalArgumentException if stopRow does not meet criteria - * for a row key (when length exceeds {@link HConstants#MAX_ROW_LENGTH}) + * @throws IllegalArgumentException if startRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) */ - public Scan setStopRow(byte [] stopRow) { + public Scan withStartRow(byte[] startRow) { + return withStartRow(startRow, true); + } + + /** + * Set the start row of the scan. + *

+ * If the specified row does not exist, or the {@code inclusive} is {@code false}, the Scanner + * will start from the next closest row after the specified row. + * @param startRow row to start scanner at or after + * @param inclusive whether we should include the start row when scan + * @return this + * @throws IllegalArgumentException if startRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) + */ + public Scan withStartRow(byte[] startRow, boolean inclusive) { + if (Bytes.len(startRow) > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException("startRow's length must be less than or equal to " + + HConstants.MAX_ROW_LENGTH + " to meet the criteria" + " for a row key."); + } + this.startRow = startRow; + this.includeStartRow = inclusive; + return this; + } + + /** + * Set the stop row of the scan. + *

+ * The scan will include rows that are lexicographically less than the provided stopRow. + *

+ * Note: When doing a filter for a rowKey Prefix use + * {@link #setRowPrefixFilter(byte[])}. The 'trailing 0' will not yield the desired result. + *

+ * @param stopRow row to end at (exclusive) + * @return this + * @throws IllegalArgumentException if stopRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) + * @deprecated use {@link #withStartRow(byte[])} instead. This method may change the inclusive of + * the stop row to keep compatible with the old behavior. + */ + @Deprecated + public Scan setStopRow(byte[] stopRow) { + withStopRow(stopRow); + if (areStartRowAndStopRowEqual(startRow, stopRow)) { + // for keeping the old behavior that a scan with the same start and stop row is a get scan. + this.includeStopRow = true; + } + return this; + } + + /** + * Set the stop row of the scan. + *

+ * The scan will include rows that are lexicographically less than the provided stopRow. + *

+ * Note: When doing a filter for a rowKey Prefix use + * {@link #setRowPrefixFilter(byte[])}. The 'trailing 0' will not yield the desired result. + *

+ * @param stopRow row to end at (exclusive) + * @return this + * @throws IllegalArgumentException if stopRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) + */ + public Scan withStopRow(byte[] stopRow) { + return withStopRow(stopRow, false); + } + + /** + * Set the stop row of the scan. + *

+ * The scan will include rows that are lexicographically less than (or equal to if + * {@code inclusive} is {@code true}) the provided stopRow. + * @param stopRow row to end at + * @param inclusive whether we should include the stop row when scan + * @return this + * @throws IllegalArgumentException if stopRow does not meet criteria for a row key (when length + * exceeds {@link HConstants#MAX_ROW_LENGTH}) + */ + public Scan withStopRow(byte[] stopRow, boolean inclusive) { if (Bytes.len(stopRow) > HConstants.MAX_ROW_LENGTH) { - throw new IllegalArgumentException( - "stopRow's length must be less than or equal to " + - HConstants.MAX_ROW_LENGTH + " to meet the criteria" + - " for a row key."); + throw new IllegalArgumentException("stopRow's length must be less than or equal to " + + HConstants.MAX_ROW_LENGTH + " to meet the criteria" + " for a row key."); } this.stopRow = stopRow; + this.includeStopRow = inclusive; return this; } @@ -629,13 +721,27 @@ public class Scan extends Query { return this.startRow; } + /** + * @return if we should include start row when scan + */ + public boolean includeStartRow() { + return includeStartRow; + } + /** * @return the stoprow */ - public byte [] getStopRow() { + public byte[] getStopRow() { return this.stopRow; } + /** + * @return if we should include stop row when scan + */ + public boolean includeStopRow() { + return includeStopRow; + } + /** * @return the max number of versions to fetch */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 55be6da5fab..fd884e33f95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -71,7 +71,7 @@ public class ScannerCallable extends RegionServerCallable { protected boolean instantiated = false; protected boolean closed = false; protected boolean renew = false; - private Scan scan; + protected final Scan scan; private int caching = 1; protected final ClusterConnection cConnection; protected ScanMetrics scanMetrics; @@ -87,7 +87,6 @@ public class ScannerCallable extends RegionServerCallable { private MoreResults moreResultsInRegion; private MoreResults moreResultsForScan; - private boolean openScanner; /** * Saves whether or not the most recent response from the server was a heartbeat message. * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} @@ -270,10 +269,8 @@ public class ScannerCallable extends RegionServerCallable { controller.setCallTimeout(callTimeout); ScanResponse response; if (this.scannerId == -1L) { - this.openScanner = true; response = openScanner(); } else { - this.openScanner = false; response = next(); } long timestamp = System.currentTimeMillis(); @@ -491,12 +488,4 @@ public class ScannerCallable extends RegionServerCallable { void setMoreResultsForScan(MoreResults moreResults) { this.moreResultsForScan = moreResults; } - - /** - * Whether the previous call is openScanner. This is used to keep compatible with the old - * implementation that we always returns empty result for openScanner. - */ - boolean isOpenScanner() { - return openScanner; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 46c8f9c0c27..e64baf58b6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -122,10 +119,6 @@ class ScannerCallableWithReplicas implements RetryingCallable { return currentScannerCallable.moreResultsForScan(); } - public boolean isOpenScanner() { - return currentScannerCallable.isOpenScanner(); - } - @Override public Result [] call(int timeout) throws IOException { // If the active replica callable was closed somewhere, invoke the RPC to @@ -322,24 +315,17 @@ class ScannerCallableWithReplicas implements RetryingCallable { * @param callable The callable to set the start row on */ private void setStartRowForReplicaCallable(ScannerCallable callable) { - if (this.lastResult == null || callable == null) return; - - if (this.lastResult.isPartial()) { - // The last result was a partial result which means we have not received all of the cells - // for this row. Thus, use the last result's row as the start row. If a replica switch - // occurs, the scanner will ensure that any accumulated partial results are cleared, - // and the scan can resume from this row. - callable.getScan().setStartRow(this.lastResult.getRow()); - } else { - // The last result was not a partial result which means it contained all of the cells for - // that row (we no longer need any information from it). Set the start row to the next - // closest row that could be seen. - if (callable.getScan().isReversed()) { - callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); - } else { - callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow())); - } + if (this.lastResult == null || callable == null) { + return; } + // 1. The last result was a partial result which means we have not received all of the cells + // for this row. Thus, use the last result's row as the start row. If a replica switch + // occurs, the scanner will ensure that any accumulated partial results are cleared, + // and the scan can resume from this row. + // 2. The last result was not a partial result which means it contained all of the cells for + // that row (we no longer need any information from it). Set the start row to the next + // closest row that could be seen. + callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.isPartial()); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index b3e40a39652..fc8c0306b95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1047,6 +1047,12 @@ public final class ProtobufUtil { if (mvccReadPoint > 0) { scanBuilder.setMvccReadPoint(mvccReadPoint); } + if (!scan.includeStartRow()) { + scanBuilder.setIncludeStartRow(false); + } + if (scan.includeStopRow()) { + scanBuilder.setIncludeStopRow(true); + } if (scan.getReadType() != Scan.ReadType.DEFAULT) { scanBuilder.setReadType(toReadType(scan.getReadType())); } @@ -1062,15 +1068,24 @@ public final class ProtobufUtil { */ public static Scan toScan( final ClientProtos.Scan proto) throws IOException { - byte [] startRow = HConstants.EMPTY_START_ROW; - byte [] stopRow = HConstants.EMPTY_END_ROW; + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + boolean includeStartRow = true; + boolean includeStopRow = false; if (proto.hasStartRow()) { startRow = proto.getStartRow().toByteArray(); } if (proto.hasStopRow()) { stopRow = proto.getStopRow().toByteArray(); } - Scan scan = new Scan(startRow, stopRow); + if (proto.hasIncludeStartRow()) { + includeStartRow = proto.getIncludeStartRow(); + } + if (proto.hasIncludeStopRow()) { + includeStopRow = proto.getIncludeStopRow(); + } + Scan scan = + new Scan().withStartRow(startRow, includeStartRow).withStopRow(stopRow, includeStopRow); if (proto.hasCacheBlocks()) { scan.setCacheBlocks(proto.getCacheBlocks()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 88c32ac15fd..1a66fbe2bd8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -86,7 +86,7 @@ public class TestClientScanner { } } - private static class MockClientScanner extends ClientScanner { + private static class MockClientScanner extends ClientSimpleScanner { private boolean rpcFinished = false; private boolean rpcFinishedFired = false; @@ -101,36 +101,21 @@ public class TestClientScanner { } @Override - protected Result[] nextScanner(int nbRows) throws IOException { + protected boolean moveToNextRegion() { if (!initialized) { initialized = true; - return super.nextScanner(nbRows); + return super.moveToNextRegion(); } if (!rpcFinished) { - return super.nextScanner(nbRows); + return super.moveToNextRegion(); } - // Enforce that we don't short-circuit more than once if (rpcFinishedFired) { throw new RuntimeException("Expected nextScanner to only be called once after " + " short-circuit was triggered."); } rpcFinishedFired = true; - return null; - } - - @Override - protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(), - s, pool, primaryOperationTimeout, scan, - getRetries(), scannerTimeout, caching, conf, caller); - return sr; + return false; } public void setRpcFinished(boolean rpcFinished) { @@ -164,7 +149,7 @@ public class TestClientScanner { case 1: // detect no more results case 2: // close count++; - return null; + return new Result[0]; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -184,10 +169,9 @@ public class TestClientScanner { scanner.loadCache(); - // One for initializeScannerInConstruction() // One for fetching the results - // One for fetching null results and quit as we do not have moreResults hint. - inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( + // One for fetching empty results and quit as we do not have moreResults hint. + inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(1, scanner.cache.size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index a9b3d0ff669..04e95e4aa4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -179,8 +179,8 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public final Collection getFilesForScanOrGet(boolean isGet, - byte[] startRow, byte[] stopRow) { + public final Collection getFilesForScanOrGet(byte[] startRow, boolean includeStartRow, + byte[] stopRow, boolean includeStopRow) { // We cannot provide any useful input and already have the files sorted by seqNum. return getStorefiles(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1f648f723ae..e804f64265e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5849,8 +5849,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected Cell joinedContinuationRow = null; private boolean filterClosed = false; - protected final int isScan; protected final byte[] stopRow; + protected final boolean includeStopRow; protected final HRegion region; private final long readPt; @@ -5880,15 +5880,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); - if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { - this.stopRow = null; - } else { - this.stopRow = scan.getStopRow(); - } - // If we are doing a get, we want to be [startRow,endRow] normally - // it is [startRow,endRow) and if startRow=endRow we get nothing. - this.isScan = scan.isGetScan() ? -1 : 0; - + this.stopRow = scan.getStopRow(); + this.includeStopRow = scan.includeStopRow(); // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); @@ -6222,7 +6215,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi length = current.getRowLength(); } - boolean stopRow = isStopRow(currentRow, offset, length); + boolean shouldStop = shouldStop(current); // When has filter row is true it means that the all the cells for a particular row must be // read before a filtering decision can be made. This means that filters where hasFilterRow // run the risk of encountering out of memory errors in the case that they are applied to a @@ -6246,7 +6239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If not, then it's main path - getting results from storeHeap. if (joinedContinuationRow == null) { // First, check if we are at a stop row. If so, there are no more results. - if (stopRow) { + if (shouldStop) { if (hasFilterRow) { filter.filterRowCells(results); } @@ -6286,8 +6279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Cell nextKv = this.storeHeap.peek(); - stopRow = nextKv == null || - isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); + shouldStop = shouldStop(nextKv); // save that the row was empty before filters applied to it. final boolean isEmptyRow = results.isEmpty(); @@ -6324,7 +6316,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. - if (!stopRow) continue; + if (!shouldStop) continue; return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -6365,10 +6357,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - if (!stopRow) continue; + if (!shouldStop) continue; } - if (stopRow) { + if (shouldStop) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); @@ -6451,11 +6443,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi .postScannerFilterRow(this, currentRow, offset, length); } - protected boolean isStopRow(byte[] currentRow, int offset, short length) { - return currentRow == null || - (stopRow != null && - comparator.compareRows(stopRow, 0, stopRow.length, - currentRow, offset, length) <= isScan); + protected boolean shouldStop(Cell currentRowCell) { + if (currentRowCell == null) { + return true; + } + if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_START_ROW)) { + return false; + } + int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); + return c > 0 || (c == 0 && !includeStopRow); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b2cc3a8da83..4032a195196 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1169,8 +1169,13 @@ public class HStore implements Store { List memStoreScanners; this.lock.readLock().lock(); try { - storeFilesToScan = - this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); + // As in branch-1 we need to support JDK7 so we can not add default methods to the Store + // interface, but add new methods directly in interface will break the compatibility, so here + // we always pass true to StoreFileManager to include more files. And for now, there is no + // performance issue as the DefaultStoreFileManager just returns all storefile, and + // StripeStoreFileManager just ignores the inclusive hints. + storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(startRow, true, + stopRow, true); memStoreScanners = this.memstore.getScanners(readPt); } finally { this.lock.readLock().unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index 63c1ca6069b..7eec632c876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.util.Bytes; /** * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to @@ -55,11 +58,15 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } } - @Override - protected boolean isStopRow(byte[] currentRow, int offset, short length) { - return currentRow == null - || (super.stopRow != null && region.getComparator().compareRows( - stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan); + protected boolean shouldStop(Cell currentRowCell) { + if (currentRowCell == null) { + return true; + } + if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_START_ROW)) { + return false; + } + int c = region.getComparator().compareRows(currentRowCell, stopRow, 0, stopRow.length); + return c < 0 || (c == 0 && !includeStopRow); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index cb192678c6f..0c0b14a251e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -105,14 +105,12 @@ public interface StoreFileManager { /** * Gets the store files to scan for a Scan or Get request. - * @param isGet Whether it's a get. * @param startRow Start row of the request. * @param stopRow Stop row of the request. * @return The list of files that are to be read for this request. */ - Collection getFilesForScanOrGet( - boolean isGet, byte[] startRow, byte[] stopRow - ); + Collection getFilesForScanOrGet(byte[] startRow, boolean includeStartRow, + byte[] stopRow, boolean includeStopRow); /** * Gets initial, full list of candidate store files to check for row-key-before. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index cd7288d6327..5f13985d8ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -287,8 +287,8 @@ public class StripeStoreFileManager } @Override - public Collection getFilesForScanOrGet( - boolean isGet, byte[] startRow, byte[] stopRow) { + public Collection getFilesForScanOrGet(byte[] startRow, boolean includeStartRow, + byte[] stopRow, boolean includeStopRow) { if (state.stripeFiles.isEmpty()) { return state.level0Files; // There's just L0. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java index d3224dceab1..49e5f42deab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java @@ -45,7 +45,7 @@ public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher { protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes, long readPointToUse, long oldestUnexpiredTS, long now) { - super(HConstants.EMPTY_START_ROW, scanInfo, + super(createStartKeyFromRow(HConstants.EMPTY_START_ROW, scanInfo), scanInfo, new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(), oldestUnexpiredTS), oldestUnexpiredTS, now); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java index 6827c6223a5..ae13db2d88b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java @@ -116,7 +116,8 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher { private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now) { - super(scan.getStartRow(), scanInfo, columns, oldestUnexpiredTS, now); + super(createStartKeyFromRow(scan.getStartRow(), scanInfo), scanInfo, columns, oldestUnexpiredTS, + now); TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily()); if (timeRange == null) { this.tr = scan.getTimeRange(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java index 894bbec87f7..8f5059fe4f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo; * Query matcher for normal user scan. */ @InterfaceAudience.Private -public class NormalUserScanQueryMatcher extends UserScanQueryMatcher { +public abstract class NormalUserScanQueryMatcher extends UserScanQueryMatcher { /** Keeps track of deletes */ private final DeleteTracker deletes; @@ -91,17 +91,45 @@ public class NormalUserScanQueryMatcher extends UserScanQueryMatcher { RegionCoprocessorHost regionCoprocessorHost) throws IOException { DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost); if (scan.isReversed()) { - return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, - oldestUnexpiredTS, now) { + if (scan.includeStopRow()) { + return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, + oldestUnexpiredTS, now) { - @Override - protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { - return cmpToStopRow > 0; - } - }; + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow >= 0; + } + }; + } else { + return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, + oldestUnexpiredTS, now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow > 0; + } + }; + } } else { - return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, - oldestUnexpiredTS, now); + if (scan.includeStopRow()) { + return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, + oldestUnexpiredTS, now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow <= 0; + } + }; + } else { + return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes, + oldestUnexpiredTS, now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow < 0; + } + }; + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java index 84484ed3eb4..b1f20e27dee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo; * Query matcher for raw scan. */ @InterfaceAudience.Private -public class RawScanQueryMatcher extends UserScanQueryMatcher { +public abstract class RawScanQueryMatcher extends UserScanQueryMatcher { protected RawScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now) { @@ -63,17 +63,45 @@ public class RawScanQueryMatcher extends UserScanQueryMatcher { public static RawScanQueryMatcher create(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now) { if (scan.isReversed()) { - return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, - now) { + if (scan.includeStopRow()) { + return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, + now) { - @Override - protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { - return cmpToStopRow > 0; - } - }; + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow >= 0; + } + }; + } else { + return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, + now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow > 0; + } + }; + } } else { - return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, - now); + if (scan.includeStopRow()) { + return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, + now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow <= 0; + } + }; + } else { + return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, + now) { + + @Override + protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { + return cmpToStopRow < 0; + } + }; + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index 127e07b093f..b47c578af60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -119,15 +119,19 @@ public abstract class ScanQueryMatcher { protected boolean stickyNextRow; - protected ScanQueryMatcher(byte[] startRow, ScanInfo scanInfo, ColumnTracker columns, + protected ScanQueryMatcher(Cell startKey, ScanInfo scanInfo, ColumnTracker columns, long oldestUnexpiredTS, long now) { this.rowComparator = scanInfo.getComparator(); - this.startKey = KeyValueUtil.createFirstDeleteFamilyOnRow(startRow, scanInfo.getFamily()); + this.startKey = startKey; this.oldestUnexpiredTS = oldestUnexpiredTS; this.now = now; this.columns = columns; } + protected static Cell createStartKeyFromRow(byte[] startRow, ScanInfo scanInfo) { + return KeyValueUtil.createFirstDeleteFamilyOnRow(startRow, scanInfo.getFamily()); + } + /** * Check before the delete logic. * @return null means continue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java index b600d94315e..9984ef3b4df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; @@ -50,9 +51,17 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { protected final TimeRange tr; + private static Cell createStartKey(Scan scan, ScanInfo scanInfo) { + if (scan.includeStartRow()) { + return createStartKeyFromRow(scan.getStartRow(), scanInfo); + } else { + return KeyValueUtil.createLastOnRow(scan.getStartRow()); + } + } + protected UserScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now) { - super(scan.getStartRow(), scanInfo, columns, oldestUnexpiredTS, now); + super(createStartKey(scan, scanInfo), scanInfo, columns, oldestUnexpiredTS, now); this.hasNullColumn = hasNullColumn; this.filter = scan.getFilter(); this.stopRow = scan.getStopRow(); @@ -159,9 +168,7 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { protected abstract boolean isGet(); - protected boolean moreRowsMayExistsAfter(int cmpToStopRow) { - return cmpToStopRow < 0; - } + protected abstract boolean moreRowsMayExistsAfter(int cmpToStopRow); @Override public boolean moreRowsMayExistAfter(Cell cell) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index df38b8e29ac..c537fe0576f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -161,14 +161,13 @@ public class TestMetaTableAccessorNoCluster { .thenThrow(new ServiceException("Server not running (1 of 3)")) .thenThrow(new ServiceException("Server not running (2 of 3)")) .thenThrow(new ServiceException("Server not running (3 of 3)")) - .thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build()) .thenAnswer(new Answer() { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { ((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); - return builder.setScannerId(1234567890L).build(); + return builder.setScannerId(1234567890L).setMoreResults(false).build(); } - }).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build()); + }); // Associate a spied-upon HConnection with UTIL.getConfiguration. Need // to shove this in here first so it gets picked up all over; e.g. by // HTable. @@ -199,8 +198,8 @@ public class TestMetaTableAccessorNoCluster { assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO)); assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow())); // Finally verify that scan was called four times -- three times - // with exception and then on 4th, 5th and 6th attempt we succeed - Mockito.verify(implementation, Mockito.times(6)). + // with exception and then on 4th attempt we succeed + Mockito.verify(implementation, Mockito.times(4)). scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any()); } finally { if (connection != null && !connection.isClosed()) connection.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index a59c041437b..6b831b1eccb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -153,6 +153,7 @@ public class TestFromClientSide { conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MultiRowMutationEndpoint.class.getName()); conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); // We need more than one region server in this test TEST_UTIL.startMiniCluster(SLAVES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index ca8273e4f7b..259aea4d023 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -141,7 +142,7 @@ public class TestScannersFromClientSide { ht.delete(delete); // without batch - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scan.setMaxVersions(); scanner = ht.getScanner(scan); @@ -155,7 +156,7 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); // with batch - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scan.setMaxVersions(); scan.setBatch(2); scanner = ht.getScanner(scan); @@ -266,7 +267,7 @@ public class TestScannersFromClientSide { * @param columns * @throws Exception */ - public void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { + private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { Scan baseScan = new Scan(); baseScan.setReversed(reversed); baseScan.setSmall(true); @@ -293,9 +294,7 @@ public class TestScannersFromClientSide { Result r = null; while ((r = scanner.next()) != null) { rowCount++; - for (Cell c : r.rawCells()) { - cellCount++; - } + cellCount += r.rawCells().length; } assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, @@ -598,7 +597,7 @@ public class TestScannersFromClientSide { } ht.put(put); - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scanner = ht.getScanner(scan); HRegionLocation loc = ht.getRegionLocation(ROW); @@ -678,5 +677,73 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } + private void assertResultEquals(Result result, int i) { + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive, + int stop, boolean stopInclusive) throws IOException { + int actualStart = startInclusive ? start : start + 1; + int actualStop = stopInclusive ? stop + 1 : stop; + int expectedCount = actualStop - actualStart; + Result[] results; + try (ResultScanner scanner = table.getScanner( + new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) { + results = scanner.next(expectedCount); + } + assertEquals(expectedCount, results.length); + for (int i = 0; i < expectedCount; i++) { + assertResultEquals(results[i], actualStart + i); + } + } + + private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive, + int stop, boolean stopInclusive) throws IOException { + int actualStart = startInclusive ? start : start - 1; + int actualStop = stopInclusive ? stop - 1 : stop; + int expectedCount = actualStart - actualStop; + Result[] results; + try (ResultScanner scanner = table.getScanner( + new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive) + .setReversed(true))) { + results = scanner.next(expectedCount); + } + assertEquals(expectedCount, results.length); + for (int i = 0; i < expectedCount; i++) { + assertResultEquals(results[i], actualStart - i); + } + } + + @Test + public void testStartRowStopRowInclusive() throws IOException, InterruptedException { + TableName tableName = TableName.valueOf("testStartRowStopRowInclusive"); + byte[][] splitKeys = new byte[8][]; + for (int i = 11; i < 99; i += 11) { + splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); + } + Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(tableName); + try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) { + for (int i = 0; i < 100; i++) { + mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER, + Bytes.toBytes(i))); + } + } + // from first region to last region + testStartRowStopRowInclusive(table, 1, true, 98, false); + testStartRowStopRowInclusive(table, 12, true, 34, true); + testStartRowStopRowInclusive(table, 23, true, 45, false); + testStartRowStopRowInclusive(table, 34, false, 56, true); + testStartRowStopRowInclusive(table, 45, false, 67, false); + + // from last region to first region + testReversedStartRowStopRowInclusive(table, 98, true, 1, false); + testReversedStartRowStopRowInclusive(table, 54, true, 32, true); + testReversedStartRowStopRowInclusive(table, 65, true, 43, false); + testReversedStartRowStopRowInclusive(table, 76, false, 54, true); + testReversedStartRowStopRowInclusive(table, 87, false, 65, false); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index a3d563166ac..6d89d737b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -89,15 +89,15 @@ public class TestStripeStoreFileManager { MockStoreFile sf = createFile(); manager.insertNewFiles(al(sf)); assertEquals(1, manager.getStorefileCount()); - Collection filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A); + Collection filesForGet = manager.getFilesForScanOrGet(KEY_A, true, KEY_A, true); assertEquals(1, filesForGet.size()); assertTrue(filesForGet.contains(sf)); // Add some stripes and make sure we get this file for every stripe. manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, OPEN_KEY))); - assertTrue(manager.getFilesForScanOrGet(true, KEY_A, KEY_A).contains(sf)); - assertTrue(manager.getFilesForScanOrGet(true, KEY_C, KEY_C).contains(sf)); + assertTrue(manager.getFilesForScanOrGet(KEY_A, true, KEY_A, true).contains(sf)); + assertTrue(manager.getFilesForScanOrGet(KEY_C, true, KEY_C, true).contains(sf)); } @Test @@ -557,7 +557,7 @@ public class TestStripeStoreFileManager { byte[] start, byte[] end, Collection results) throws Exception { start = start != null ? start : HConstants.EMPTY_START_ROW; end = end != null ? end : HConstants.EMPTY_END_ROW; - Collection sfs = manager.getFilesForScanOrGet(isGet, start, end); + Collection sfs = manager.getFilesForScanOrGet(start, true, end, true); assertEquals(results.size(), sfs.size()); for (StoreFile result : results) { assertTrue(sfs.contains(result));