HBASE-13262 Observe ScanResponse.moreResults in ClientScanner.

The RS already returns to the client whether or not it has additional
results to be returned in a subsequent call to scan(), but the ClientScanner
did not use or adhere to this value. Subsequently, this can lead to
bugs around moving to the next region too early. A new method was added
to ClientScanner in the name of testability.

Encapsulate server-state into RegionServerCallable to avoid
modifying parameterization of callable impls.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Josh Elser 2015-03-28 18:56:52 -07:00 committed by Andrew Purtell
parent a6ff17b958
commit ced0e324a1
10 changed files with 1070 additions and 181 deletions

View File

@ -301,7 +301,7 @@ public class ClientScanner extends AbstractClientScanner {
return callable.isAnyRPCcancelled();
}
static Result[] call(ScannerCallableWithReplicas callable,
Result[] call(ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {
if (Thread.interrupted()) {
@ -353,126 +353,7 @@ public class ClientScanner extends AbstractClientScanner {
return null;
}
if (cache.size() == 0) {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true;
do {
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
// and let the catch-up happen in the beginning of the loop as it
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
this.currentRegion = callable.getHRegionInfo();
continue;
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
clearPartialResults();
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
// to reset the scanner and come back in again.
if (e instanceof UnknownScannerException) {
long timeout = lastNext + scannerTimeout;
// If we are over the timeout, throw this exception to the client wrapped in
// a ScannerTimeoutException. Else, it's because the region moved and we used the old
// id against the new region server; reset the scanner.
if (timeout < System.currentTimeMillis()) {
long elapsed = System.currentTimeMillis() - lastNext;
ScannerTimeoutException ex =
new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
+ "timeout is currently set to " + scannerTimeout);
ex.initCause(e);
throw ex;
}
} else {
// If exception is any but the list below throw it back to the client; else setup
// the scanner and retry.
Throwable cause = e.getCause();
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException) {
// Pass
// It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
throw e;
}
}
// Else, its signal from depths of ScannerCallable that we need to reset the scanner.
if (this.lastResult != null) {
// The region has moved. We need to open a brand new scanner at
// the new location.
// Reset the startRow to the row we've seen last so that the new
// scanner starts at the correct row. Otherwise we may see previously
// returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
}
}
if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException) {
retryAfterOutOfOrderException = false;
} else {
// TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
throw new DoNotRetryIOException("Failed after retry of " +
"OutOfOrderScannerNextException: was there a rpc timeout?", e);
}
}
// Clear region.
this.currentRegion = null;
// Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down.
callable = null;
// This continue will take us to while at end of loop where we will set up new scanner.
continue;
}
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
}
lastNext = currentTime;
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
List<Result> resultsToAddToCache = getResultsToAddToCache(values);
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
// Values == null means server-side filter has determined we must STOP
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
} while (remainingResultSize > 0 && countdown > 0
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
loadCache();
}
if (cache.size() > 0) {
@ -489,6 +370,144 @@ public class ClientScanner extends AbstractClientScanner {
return cache != null ? cache.size() : 0;
}
/**
* Contact the servers to load more {@link Result}s in the cache.
*/
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true;
// We don't expect that the server will have more results for us if
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
do {
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
// and let the catch-up happen in the beginning of the loop as it
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
this.currentRegion = callable.getHRegionInfo();
continue;
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
clearPartialResults();
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
// to reset the scanner and come back in again.
if (e instanceof UnknownScannerException) {
long timeout = lastNext + scannerTimeout;
// If we are over the timeout, throw this exception to the client wrapped in
// a ScannerTimeoutException. Else, it's because the region moved and we used the old
// id against the new region server; reset the scanner.
if (timeout < System.currentTimeMillis()) {
long elapsed = System.currentTimeMillis() - lastNext;
ScannerTimeoutException ex =
new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
+ "timeout is currently set to " + scannerTimeout);
ex.initCause(e);
throw ex;
}
} else {
// If exception is any but the list below throw it back to the client; else setup
// the scanner and retry.
Throwable cause = e.getCause();
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException) {
// Pass
// It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
throw e;
}
}
// Else, its signal from depths of ScannerCallable that we need to reset the scanner.
if (this.lastResult != null) {
// The region has moved. We need to open a brand new scanner at
// the new location.
// Reset the startRow to the row we've seen last so that the new
// scanner starts at the correct row. Otherwise we may see previously
// returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
}
}
if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException) {
retryAfterOutOfOrderException = false;
} else {
// TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
throw new DoNotRetryIOException("Failed after retry of " +
"OutOfOrderScannerNextException: was there a rpc timeout?", e);
}
}
// Clear region.
this.currentRegion = null;
// Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down.
callable = null;
// This continue will take us to while at end of loop where we will set up new scanner.
continue;
}
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
}
lastNext = currentTime;
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
List<Result> resultsToAddToCache = getResultsToAddToCache(values);
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
// We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
// get results is the moreResults context valid.
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
// Only adhere to more server results when we don't have any partialResults
// as it keeps the outer loop logic the same.
serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty();
}
// Values == null means server-side filter has determined we must STOP
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
} while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
}
/**
* This method ensures all of our book keeping regarding partial results is kept up to date. This
* method should be called once we know that the results we received back from the RPC request do

View File

@ -143,6 +143,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// TODO use context from server
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();

View File

@ -136,11 +136,11 @@ public class ClientSmallScanner extends ClientScanner {
}
static ScannerCallableWithReplicas getSmallScanCallable(
ClusterConnection connection, TableName table, Scan scan,
ScanMetrics scanMetrics, byte[] localStartKey, final int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result []> caller) {
static ScannerCallableWithReplicas getSmallScanCallable(ClusterConnection connection,
TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey,
final int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
scan.setStartRow(localStartKey);
SmallScannerCallable s = new SmallScannerCallable(
connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
@ -173,8 +173,15 @@ public class ClientSmallScanner extends ClientScanner {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(),
Result[] results = ResponseConverter.getResults(controller.cellScanner(),
response);
if (response.hasMoreResultsInRegion()) {
setHasMoreResultsContext(true);
setServerHasMoreResults(response.getMoreResultsInRegion());
} else {
setHasMoreResultsContext(false);
}
return results;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@ -207,6 +214,7 @@ public class ClientSmallScanner extends ClientScanner {
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// TODO Use the server's response about more results
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();

View File

@ -76,6 +76,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
private int logCutOffLatency = 1000;
private static String myAddress;
protected final int id;
protected boolean serverHasMoreResultsContext;
protected boolean serverHasMoreResults;
static {
try {
myAddress = DNS.getDefaultHost("default", "default");
@ -177,7 +179,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
@Override
@SuppressWarnings("deprecation")
public Result [] call(int callTimeout) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
@ -223,12 +224,23 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
+ rows + " rows from scanner=" + scannerId);
}
}
if (response.hasMoreResults()
&& !response.getMoreResults()) {
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults() && !response.getMoreResults()) {
scannerId = -1L;
closed = true;
// Implied that no results were returned back, either.
return null;
}
// moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
// to size or quantity of results in the response.
if (response.hasMoreResultsInRegion()) {
// Set what the RS said
setHasMoreResultsContext(true);
setServerHasMoreResults(response.getMoreResultsInRegion());
} else {
// Server didn't respond whether it has more results or not.
setHasMoreResultsContext(false);
}
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@ -394,4 +406,30 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
s.setCaching(this.caching);
return s;
}
/**
* Should the client attempt to fetch more results from this region
* @return True if the client should attempt to fetch more results, false otherwise.
*/
protected boolean getServerHasMoreResults() {
assert serverHasMoreResultsContext;
return this.serverHasMoreResults;
}
protected void setServerHasMoreResults(boolean serverHasMoreResults) {
this.serverHasMoreResults = serverHasMoreResults;
}
/**
* Did the server respond with information about whether more results might exist.
* Not guaranteed to respond with older server versions
* @return True if the server responded with information about more results.
*/
protected boolean hasMoreResultsContext() {
return serverHasMoreResultsContext;
}
protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
this.serverHasMoreResultsContext = serverHasMoreResultsContext;
}
}

View File

@ -111,6 +111,22 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return currentScannerCallable.getHRegionInfo();
}
public boolean getServerHasMoreResults() {
return currentScannerCallable.getServerHasMoreResults();
}
public void setServerHasMoreResults(boolean serverHasMoreResults) {
currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
}
public boolean hasMoreResultsContext() {
return currentScannerCallable.hasMoreResultsContext();
}
public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
}
@Override
public Result [] call(int timeout) throws IOException {
// If the active replica callable was closed somewhere, invoke the RPC to

View File

@ -0,0 +1,489 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test the ClientScanner.
*/
@Category(SmallTests.class)
public class TestClientScanner {
Scan scan;
ExecutorService pool;
Configuration conf;
ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory;
RpcControllerFactory controllerFactory;
@Before
@SuppressWarnings("deprecation")
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
controllerFactory = Mockito.mock(RpcControllerFactory.class);
pool = Executors.newSingleThreadExecutor();
scan = new Scan();
conf = new Configuration();
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
}
@After
public void teardown() {
if (null != pool) {
pool.shutdownNow();
}
}
private static class MockClientScanner extends ClientScanner {
private boolean rpcFinished = false;
private boolean rpcFinishedFired = false;
public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
}
@Override
protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
if (!rpcFinished) {
return super.nextScanner(nbRows, done);
}
// Enforce that we don't short-circuit more than once
if (rpcFinishedFired) {
throw new RuntimeException("Expected nextScanner to only be called once after " +
" short-circuit was triggered.");
}
rpcFinishedFired = true;
return false;
}
@Override
protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
s, pool, primaryOperationTimeout, scan,
getRetries(), scannerTimeout, caching, conf, caller);
return sr;
}
public void setRpcFinished(boolean rpcFinished) {
this.rpcFinished = rpcFinished;
}
}
@Test
@SuppressWarnings("unchecked")
public void testNoResultsHint() throws IOException {
final Result[] results = new Result[1];
KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
results[0] = Result.create(new Cell[] {kv1});
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
private int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) throws Throwable {
ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 2: // close
count++;
return null;
case 1:
count++;
callable.setHasMoreResultsContext(false);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
}
}
});
// Set a much larger cache and buffer size than we'll provide
scan.setCaching(100);
scan.setMaxResultSize(1000*1000);
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
scanner.setRpcFinished(true);
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
// One more call due to initializeScannerInConstruction()
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
}
}
@Test
@SuppressWarnings("unchecked")
public void testSizeLimit() throws IOException {
final Result[] results = new Result[1];
KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
results[0] = Result.create(new Cell[] {kv1});
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
private int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) throws Throwable {
ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 2: // close
count++;
return null;
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
}
}
});
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
// Set a much larger cache
scan.setCaching(100);
// The single key-value will exit the loop
scan.setMaxResultSize(1);
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
// Due to initializeScannerInConstruction()
Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
}
}
@Test
@SuppressWarnings("unchecked")
public void testCacheLimit() throws IOException {
KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
final Result[] results = new Result[] {Result.create(new Cell[] {kv1}),
Result.create(new Cell[] {kv2}), Result.create(new Cell[] {kv3})};
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
private int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) throws Throwable {
ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 2: // close
count++;
return null;
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
}
}
});
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
// Set a small cache
scan.setCaching(1);
// Set a very large size
scan.setMaxResultSize(1000*1000);
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
// Due to initializeScannerInConstruction()
Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
// Ensures that possiblyNextScanner isn't called at the end which would trigger
// another call to callWithoutRetries
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(3, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
r = scanner.cache.poll();
assertNotNull(r);
cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv2, cs.current());
assertFalse(cs.advance());
r = scanner.cache.poll();
assertNotNull(r);
cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv3, cs.current());
assertFalse(cs.advance());
}
}
@Test
@SuppressWarnings("unchecked")
public void testNoMoreResults() throws IOException {
final Result[] results = new Result[1];
KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
results[0] = Result.create(new Cell[] {kv1});
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
private int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) throws Throwable {
ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 2: // close
count++;
return null;
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
}
}
});
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
// Set a much larger cache and buffer size than we'll provide
scan.setCaching(100);
scan.setMaxResultSize(1000*1000);
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
// Due to initializeScannerInConstruction()
Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt());
scanner.setRpcFinished(true);
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
}
}
@Test
@SuppressWarnings("unchecked")
public void testMoreResults() throws IOException {
final Result[] results1 = new Result[1];
KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
results1[0] = Result.create(new Cell[] {kv1});
final Result[] results2 = new Result[1];
KeyValue kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
results2[0] = Result.create(new Cell[] {kv2});
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
private int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) throws Throwable {
ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 3: // close
count++;
return null;
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(true);
return results1;
case 2:
count++;
// The server reports back false WRT more results
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
return results2;
default:
throw new RuntimeException("Expected only 2 invocations");
}
}
});
// Set a much larger cache and buffer size than we'll provide
scan.setCaching(100);
scan.setMaxResultSize(1000*1000);
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
// Due to initializeScannerInConstruction()
Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
scanner.setRpcFinished(true);
inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
r = scanner.cache.poll();
assertNotNull(r);
cs = r.cellScanner();
assertTrue(cs.advance());
assertEquals(kv2, cs.current());
assertFalse(cs.advance());
}
}
}

View File

@ -17784,6 +17784,28 @@ public final class ClientProtos {
* </pre>
*/
boolean getPartialFlagPerResult(int index);
// optional bool more_results_in_region = 8;
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
boolean hasMoreResultsInRegion();
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
boolean getMoreResultsInRegion();
}
/**
* Protobuf type {@code ScanResponse}
@ -17912,6 +17934,11 @@ public final class ClientProtos {
input.popLimit(limit);
break;
}
case 64: {
bitField0_ |= 0x00000010;
moreResultsInRegion_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -18197,6 +18224,34 @@ public final class ClientProtos {
return partialFlagPerResult_.get(index);
}
// optional bool more_results_in_region = 8;
public static final int MORE_RESULTS_IN_REGION_FIELD_NUMBER = 8;
private boolean moreResultsInRegion_;
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public boolean hasMoreResultsInRegion() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public boolean getMoreResultsInRegion() {
return moreResultsInRegion_;
}
private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L;
@ -18205,6 +18260,7 @@ public final class ClientProtos {
results_ = java.util.Collections.emptyList();
stale_ = false;
partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -18239,6 +18295,9 @@ public final class ClientProtos {
for (int i = 0; i < partialFlagPerResult_.size(); i++) {
output.writeBool(7, partialFlagPerResult_.get(i));
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBool(8, moreResultsInRegion_);
}
getUnknownFields().writeTo(output);
}
@ -18283,6 +18342,10 @@ public final class ClientProtos {
size += dataSize;
size += 1 * getPartialFlagPerResultList().size();
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, moreResultsInRegion_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -18332,6 +18395,11 @@ public final class ClientProtos {
}
result = result && getPartialFlagPerResultList()
.equals(other.getPartialFlagPerResultList());
result = result && (hasMoreResultsInRegion() == other.hasMoreResultsInRegion());
if (hasMoreResultsInRegion()) {
result = result && (getMoreResultsInRegion()
== other.getMoreResultsInRegion());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -18373,6 +18441,10 @@ public final class ClientProtos {
hash = (37 * hash) + PARTIAL_FLAG_PER_RESULT_FIELD_NUMBER;
hash = (53 * hash) + getPartialFlagPerResultList().hashCode();
}
if (hasMoreResultsInRegion()) {
hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getMoreResultsInRegion());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -18507,6 +18579,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000020);
partialFlagPerResult_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000040);
moreResultsInRegion_ = false;
bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@ -18570,6 +18644,10 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000040);
}
result.partialFlagPerResult_ = partialFlagPerResult_;
if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
to_bitField0_ |= 0x00000010;
}
result.moreResultsInRegion_ = moreResultsInRegion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -18644,6 +18722,9 @@ public final class ClientProtos {
}
onChanged();
}
if (other.hasMoreResultsInRegion()) {
setMoreResultsInRegion(other.getMoreResultsInRegion());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -19423,6 +19504,63 @@ public final class ClientProtos {
return this;
}
// optional bool more_results_in_region = 8;
private boolean moreResultsInRegion_ ;
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public boolean hasMoreResultsInRegion() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public boolean getMoreResultsInRegion() {
return moreResultsInRegion_;
}
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public Builder setMoreResultsInRegion(boolean value) {
bitField0_ |= 0x00000080;
moreResultsInRegion_ = value;
onChanged();
return this;
}
/**
* <code>optional bool more_results_in_region = 8;</code>
*
* <pre>
* A server may choose to limit the number of results returned to the client for
* reasons such as the size in bytes or quantity of results accumulated. This field
* will true when more results exist in the current region.
* </pre>
*/
public Builder clearMoreResultsInRegion() {
bitField0_ = (bitField0_ & ~0x00000080);
moreResultsInRegion_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanResponse)
}
@ -32556,58 +32694,59 @@ public final class ClientProtos {
"nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
"er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
"lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\251\001\n\014S" +
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" +
"canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
"\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
"\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
"\030\007 \003(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_path",
"\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPath" +
"\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016" +
"\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" +
"HFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproce" +
"ssorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_" +
"name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reque" +
"st\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005" +
"value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coprocess" +
"orServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
"nSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSe",
"rviceCall\"]\n\032CoprocessorServiceResponse\022" +
" \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005val" +
"ue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005in" +
"dex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPr" +
"oto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004" +
" \001(\0132\027.CoprocessorServiceCall\"Y\n\014RegionA" +
"ction\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
"\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action" +
"\"D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(" +
"\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021Resul",
"tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " +
"\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" +
"tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" +
"essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." +
"RegionLoadStats\"f\n\022RegionActionResult\022-\n" +
"\021resultOrException\030\001 \003(\0132\022.ResultOrExcep" +
"tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" +
"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." +
"RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" +
"ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse",
"\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" +
"ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" +
"ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClient" +
"Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" +
"e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" +
"onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" +
"e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" +
"t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" +
"\022\032.CoprocessorServiceRequest\032\033.Coprocess" +
"orServiceResponse\022R\n\027ExecRegionServerSer",
"vice\022\032.CoprocessorServiceRequest\032\033.Copro" +
"cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" +
"quest\032\016.MultiResponseBB\n*org.apache.hado" +
"op.hbase.protobuf.generatedB\014ClientProto" +
"sH\001\210\001\001\240\001\001"
"\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" +
"\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132",
"\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
"gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
"\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
"ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
"iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" +
"(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
"\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " +
"\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" +
"eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
"er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
"l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
"2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" +
"\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" +
"et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" +
"oprocessorServiceCall\"Y\n\014RegionAction\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" +
"c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" +
"onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r",
"heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" +
"tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" +
"sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" +
"1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" +
"viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" +
"adStats\"f\n\022RegionActionResult\022-\n\021resultO" +
"rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" +
"exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" +
"Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" +
"tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ",
"\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" +
"onActionResult\030\001 \003(\0132\023.RegionActionResul" +
"t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" +
"TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" +
" \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" +
"ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" +
"Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" +
"kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" +
"LoadHFileResponse\022F\n\013ExecService\022\032.Copro" +
"cessorServiceRequest\032\033.CoprocessorServic",
"eResponse\022R\n\027ExecRegionServerService\022\032.C" +
"oprocessorServiceRequest\032\033.CoprocessorSe" +
"rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." +
"MultiResponseBB\n*org.apache.hadoop.hbase" +
".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" +
"\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -32709,7 +32848,7 @@ public final class ClientProtos {
internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor,
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", });
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", });
internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new

View File

@ -308,6 +308,11 @@ message ScanResponse {
// has false, false, true in it, then we know that on the client side, we need to
// make another RPC request since the last result was only a partial.
repeated bool partial_flag_per_result = 7;
// A server may choose to limit the number of results returned to the client for
// reasons such as the size in bytes or quantity of results accumulated. This field
// will true when more results exist in the current region.
optional bool more_results_in_region = 8;
}
/**

View File

@ -2213,10 +2213,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
boolean enforceMaxResultSizeAtCellLevel =
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
NextState state = null;
while (i < rows) {
// Stop collecting results if we have exceeded maxResultSize
if (currentScanResultSize >= maxResultSize) {
builder.setMoreResultsInRegion(true);
break;
}
@ -2227,8 +2229,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
: -1;
// Collect values to be returned here
NextState state =
scanner.nextRaw(values, scanner.getBatch(), remainingResultSize);
state = scanner.nextRaw(values, scanner.getBatch(), remainingResultSize);
// Invalid states should never be returned. If one is seen, throw exception
// to stop the scan -- We have no way of telling how we should proceed
if (!NextState.isValidState(state)) {
@ -2260,6 +2261,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
values.clear();
}
// currentScanResultSize >= maxResultSize should be functionally equivalent to
// state.sizeLimitReached()
if (null != state
&& (currentScanResultSize >= maxResultSize || i >= rows || state
.hasMoreValues())) {
// We stopped prematurely
builder.setMoreResultsInRegion(true);
} else {
// We didn't get a single batch
builder.setMoreResultsInRegion(false);
}
}
region.readRequestsCount.add(i);
region.getMetrics().updateScanNext(totalCellSize);

View File

@ -0,0 +1,162 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestSizeFailures {
final Log LOG = LogFactory.getLog(getClass());
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] FAMILY = Bytes.toBytes("testFamily");
protected static int SLAVES = 1;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
//((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.table.sanity.checks", true); // ignore sanity checks in the server
TEST_UTIL.startMiniCluster(SLAVES);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Basic client side validation of HBASE-13262
*/
@Test
public void testScannerSeesAllRecords() throws Exception {
final int NUM_ROWS = 1000 * 1000, NUM_COLS = 10;
final TableName TABLENAME = TableName.valueOf("testScannerSeesAllRecords");
List<byte[]> qualifiers = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
qualifiers.add(Bytes.toBytes(Integer.toString(i)));
}
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
HTableDescriptor desc = new HTableDescriptor(TABLENAME);
desc.addFamily(hcd);
byte[][] splits = new byte[9][2];
for (int i = 1; i < 10; i++) {
int split = 48 + i;
splits[i - 1][0] = (byte) (split >>> 8);
splits[i - 1][0] = (byte) (split);
}
TEST_UTIL.getHBaseAdmin().createTable(desc, splits);
Connection conn = TEST_UTIL.getConnection();
try (Table table = conn.getTable(TABLENAME)) {
List<Put> puts = new LinkedList<>();
for (int i = 0; i < NUM_ROWS; i++) {
Put p = new Put(Bytes.toBytes(Integer.toString(i)));
for (int j = 0; j < NUM_COLS; j++) {
byte[] value = new byte[50];
Bytes.random(value);
p.addColumn(FAMILY, Bytes.toBytes(Integer.toString(j)), value);
}
puts.add(p);
if (puts.size() == 1000) {
Object[] results = new Object[1000];
try {
table.batch(puts, results);
} catch (IOException e) {
LOG.error("Failed to write data", e);
LOG.debug("Errors: " + Arrays.toString(results));
}
puts.clear();
}
}
if (puts.size() > 0) {
Object[] results = new Object[puts.size()];
try {
table.batch(puts, results);
} catch (IOException e) {
LOG.error("Failed to write data", e);
LOG.debug("Errors: " + Arrays.toString(results));
}
}
// Flush the memstore to disk
TEST_UTIL.getHBaseAdmin().flush(TABLENAME);
TreeSet<Integer> rows = new TreeSet<>();
long rowsObserved = 0l;
long entriesObserved = 0l;
Scan s = new Scan();
s.addFamily(FAMILY);
s.setMaxResultSize(-1);
s.setBatch(-1);
s.setCaching(500);
ResultScanner scanner = table.getScanner(s);
// Read all the records in the table
for (Result result : scanner) {
rowsObserved++;
String row = new String(result.getRow());
rows.add(Integer.parseInt(row));
while (result.advance()) {
entriesObserved++;
// result.current();
}
}
// Verify that we see 1M rows and 10M cells
assertEquals(NUM_ROWS, rowsObserved);
assertEquals(NUM_ROWS * NUM_COLS, entriesObserved);
}
conn.close();
}
}