HBASE-18553 Expose scan cursor for asynchronous scanner
This commit is contained in:
parent
2a9cdd5e75
commit
4c74a73d57
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
@ -157,10 +158,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
|
|
||||||
private ScanResumerImpl resumer;
|
private ScanResumerImpl resumer;
|
||||||
|
|
||||||
public ScanControllerImpl(ScanResponse resp) {
|
public ScanControllerImpl(Optional<Cursor> cursor) {
|
||||||
callerThread = Thread.currentThread();
|
this.callerThread = Thread.currentThread();
|
||||||
cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
|
this.cursor = cursor;
|
||||||
: Optional.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void preCheck() {
|
private void preCheck() {
|
||||||
|
@ -476,10 +476,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
}
|
}
|
||||||
updateServerSideMetrics(scanMetrics, resp);
|
updateServerSideMetrics(scanMetrics, resp);
|
||||||
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
|
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
|
||||||
|
Result[] rawResults;
|
||||||
Result[] results;
|
Result[] results;
|
||||||
int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
|
int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
|
||||||
try {
|
try {
|
||||||
Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
|
rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
|
||||||
updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
|
updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
|
||||||
results = resultCache.addAndGet(
|
results = resultCache.addAndGet(
|
||||||
Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
|
Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
|
||||||
|
@ -493,12 +494,30 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanControllerImpl scanController = new ScanControllerImpl(resp);
|
ScanControllerImpl scanController;
|
||||||
if (results.length > 0) {
|
if (results.length > 0) {
|
||||||
|
scanController = new ScanControllerImpl(
|
||||||
|
resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
|
||||||
|
: Optional.empty());
|
||||||
updateNextStartRowWhenError(results[results.length - 1]);
|
updateNextStartRowWhenError(results[results.length - 1]);
|
||||||
consumer.onNext(results, scanController);
|
consumer.onNext(results, scanController);
|
||||||
} else if (resp.hasHeartbeatMessage() && resp.getHeartbeatMessage()) {
|
} else {
|
||||||
consumer.onHeartbeat(scanController);
|
Optional<Cursor> cursor = Optional.empty();
|
||||||
|
if (resp.hasCursor()) {
|
||||||
|
cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
|
||||||
|
} else if (scan.isNeedCursorResult() && rawResults.length > 0) {
|
||||||
|
// It is size limit exceed and we need to return the last Result's row.
|
||||||
|
// When user setBatch and the scanner is reopened, the server may return Results that
|
||||||
|
// user has seen and the last Result can not be seen because the number is not enough.
|
||||||
|
// So the row keys of results may not be same, we must use the last one.
|
||||||
|
cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
|
||||||
|
}
|
||||||
|
scanController = new ScanControllerImpl(cursor);
|
||||||
|
if (isHeartbeatMessage || cursor.isPresent()) {
|
||||||
|
// only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
|
||||||
|
// want to pass a cursor to upper layer.
|
||||||
|
consumer.onHeartbeat(scanController);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ScanControllerState state = scanController.destroy();
|
ScanControllerState state = scanController.destroy();
|
||||||
if (state == ScanControllerState.TERMINATED) {
|
if (state == ScanControllerState.TERMINATED) {
|
||||||
|
|
|
@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
|
@ -31,6 +28,8 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
|
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
|
||||||
|
@ -46,6 +45,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
|
||||||
|
|
||||||
private final long maxCacheSize;
|
private final long maxCacheSize;
|
||||||
|
|
||||||
|
private final Scan scan;
|
||||||
|
|
||||||
private final Queue<Result> queue = new ArrayDeque<>();
|
private final Queue<Result> queue = new ArrayDeque<>();
|
||||||
|
|
||||||
private ScanMetrics scanMetrics;
|
private ScanMetrics scanMetrics;
|
||||||
|
@ -61,6 +62,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
|
||||||
public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
|
public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
|
||||||
this.rawTable = table;
|
this.rawTable = table;
|
||||||
this.maxCacheSize = maxCacheSize;
|
this.maxCacheSize = maxCacheSize;
|
||||||
|
this.scan = scan;
|
||||||
table.scan(scan, this);
|
table.scan(scan, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,6 +100,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
|
||||||
public synchronized void onHeartbeat(ScanController controller) {
|
public synchronized void onHeartbeat(ScanController controller) {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
controller.terminate();
|
controller.terminate();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (scan.isNeedCursorResult()) {
|
||||||
|
controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,9 +149,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Result result = queue.poll();
|
Result result = queue.poll();
|
||||||
cacheSize -= calcEstimatedSize(result);
|
if (!result.isCursor()) {
|
||||||
if (resumer != null && cacheSize <= maxCacheSize / 2) {
|
cacheSize -= calcEstimatedSize(result);
|
||||||
resumePrefetch();
|
if (resumer != null && cacheSize <= maxCacheSize / 2) {
|
||||||
|
resumePrefetch();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,8 +95,15 @@ public interface RawScanResultConsumer {
|
||||||
void onNext(Result[] results, ScanController controller);
|
void onNext(Result[] results, ScanController controller);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call
|
* Indicate that there is a heartbeat message but we have not cumulated enough cells to call
|
||||||
* onNext.
|
* {@link #onNext(Result[], ScanController)}.
|
||||||
|
* <p>
|
||||||
|
* Note that this method will always be called when RS returns something to us but we do not have
|
||||||
|
* enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a
|
||||||
|
* 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is
|
||||||
|
* exceeded before sending all the cells for this row. For RS it does send some data to us and the
|
||||||
|
* time limit has not been reached, but we can not return the data to client so here we call this
|
||||||
|
* method to tell client we have already received something.
|
||||||
* <p>
|
* <p>
|
||||||
* This method give you a chance to terminate a slow scan operation.
|
* This method give you a chance to terminate a slow scan operation.
|
||||||
* @param controller used to suspend or terminate the scan. Notice that the {@code controller}
|
* @param controller used to suspend or terminate the scan. Notice that the {@code controller}
|
||||||
|
|
|
@ -17,31 +17,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
public abstract class AbstractTestResultScannerCursor extends AbstractTestScanCursor {
|
||||||
public class TestScanCursor extends AbstractTestScanCursor {
|
|
||||||
|
protected abstract ResultScanner getScanner(Scan scan) throws Exception;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHeartbeatWithSparseFilter() throws Exception {
|
public void testHeartbeatWithSparseFilter() throws Exception {
|
||||||
try (ResultScanner scanner =
|
try (ResultScanner scanner = getScanner(createScanWithSparseFilter())) {
|
||||||
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) {
|
|
||||||
int num = 0;
|
int num = 0;
|
||||||
Result r;
|
Result r;
|
||||||
while ((r = scanner.next()) != null) {
|
while ((r = scanner.next()) != null) {
|
||||||
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
||||||
Assert.assertTrue(r.isCursor());
|
assertTrue(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
r.getCursor().getRow());
|
r.getCursor().getRow());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertFalse(r.isCursor());
|
assertFalse(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
||||||
}
|
}
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
|
@ -50,18 +50,17 @@ public class TestScanCursor extends AbstractTestScanCursor {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHeartbeatWithSparseFilterReversed() throws Exception {
|
public void testHeartbeatWithSparseFilterReversed() throws Exception {
|
||||||
try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME)
|
try (ResultScanner scanner = getScanner(createReversedScanWithSparseFilter())) {
|
||||||
.getScanner(createReversedScanWithSparseFilter())) {
|
|
||||||
int num = 0;
|
int num = 0;
|
||||||
Result r;
|
Result r;
|
||||||
while ((r = scanner.next()) != null) {
|
while ((r = scanner.next()) != null) {
|
||||||
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
||||||
Assert.assertTrue(r.isCursor());
|
assertTrue(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
|
assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
r.getCursor().getRow());
|
r.getCursor().getRow());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertFalse(r.isCursor());
|
assertFalse(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[0], r.getRow());
|
assertArrayEquals(ROWS[0], r.getRow());
|
||||||
}
|
}
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
|
@ -76,12 +75,12 @@ public class TestScanCursor extends AbstractTestScanCursor {
|
||||||
Result r;
|
Result r;
|
||||||
while ((r = scanner.next()) != null) {
|
while ((r = scanner.next()) != null) {
|
||||||
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) {
|
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) {
|
||||||
Assert.assertTrue(r.isCursor());
|
assertTrue(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
r.getCursor().getRow());
|
r.getCursor().getRow());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertFalse(r.isCursor());
|
assertFalse(r.isCursor());
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
||||||
}
|
}
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncResultScannerCursor extends AbstractTestResultScannerCursor {
|
||||||
|
|
||||||
|
private static AsyncConnection CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
AbstractTestScanCursor.setUpBeforeClass();
|
||||||
|
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
if (CONN != null) {
|
||||||
|
CONN.close();
|
||||||
|
}
|
||||||
|
AbstractTestScanCursor.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResultScanner getScanner(Scan scan) throws Exception {
|
||||||
|
return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan);
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,70 +27,83 @@ import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
|
public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
|
||||||
|
|
||||||
|
private static AsyncConnection CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
AbstractTestScanCursor.setUpBeforeClass();
|
||||||
|
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
if (CONN != null) {
|
||||||
|
CONN.close();
|
||||||
|
}
|
||||||
|
AbstractTestScanCursor.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
private void doTest(boolean reversed)
|
private void doTest(boolean reversed)
|
||||||
throws InterruptedException, ExecutionException, IOException {
|
throws InterruptedException, ExecutionException, IOException {
|
||||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
try (AsyncConnection conn =
|
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
|
||||||
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
|
||||||
RawAsyncTable table = conn.getRawTable(TABLE_NAME);
|
new RawScanResultConsumer() {
|
||||||
table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
|
|
||||||
new RawScanResultConsumer() {
|
|
||||||
|
|
||||||
private int count;
|
private int count;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHeartbeat(ScanController controller) {
|
public void onHeartbeat(ScanController controller) {
|
||||||
int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
|
int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
|
||||||
if (reversed) {
|
if (reversed) {
|
||||||
row = NUM_ROWS - 1 - row;
|
row = NUM_ROWS - 1 - row;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
|
assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
|
||||||
count++;
|
count++;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
throw e;
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(Result[] results, ScanController controller) {
|
||||||
|
try {
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
|
||||||
|
// we will always provide a scan cursor if time limit is reached.
|
||||||
|
if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
|
||||||
|
assertFalse(controller.cursor().isPresent());
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
|
||||||
|
controller.cursor().get().getRow());
|
||||||
}
|
}
|
||||||
|
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
|
||||||
|
count++;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNext(Result[] results, ScanController controller) {
|
public void onError(Throwable error) {
|
||||||
try {
|
future.completeExceptionally(error);
|
||||||
assertEquals(1, results.length);
|
}
|
||||||
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
|
|
||||||
// we will always provide a scan cursor if time limit is reached.
|
|
||||||
if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
|
|
||||||
assertFalse(controller.cursor().isPresent());
|
|
||||||
} else {
|
|
||||||
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
|
|
||||||
controller.cursor().get().getRow());
|
|
||||||
}
|
|
||||||
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
|
|
||||||
count++;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
future.completeExceptionally(e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable error) {
|
public void onComplete() {
|
||||||
future.completeExceptionally(error);
|
future.complete(null);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
@Override
|
future.get();
|
||||||
public void onComplete() {
|
|
||||||
future.complete(null);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
future.get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -104,4 +117,50 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
throws IOException, InterruptedException, ExecutionException {
|
||||||
doTest(true);
|
doTest(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSizeLimit() throws InterruptedException, ExecutionException {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
|
||||||
|
table.scan(createScanWithSizeLimit(), new RawScanResultConsumer() {
|
||||||
|
|
||||||
|
private int count;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onHeartbeat(ScanController controller) {
|
||||||
|
try {
|
||||||
|
assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
|
controller.cursor().get().getRow());
|
||||||
|
count++;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(Result[] results, ScanController controller) {
|
||||||
|
try {
|
||||||
|
assertFalse(controller.cursor().isPresent());
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow());
|
||||||
|
count++;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable error) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
future.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestResultScannerCursor extends AbstractTestResultScannerCursor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResultScanner getScanner(Scan scan) throws IOException {
|
||||||
|
return TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue