HBASE-18553 Expose scan cursor for asynchronous scanner

This commit is contained in:
zhangduo 2017-08-15 17:15:06 +08:00
parent 1bae5cabf9
commit 770312a8c2
7 changed files with 262 additions and 87 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -157,10 +158,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private ScanResumerImpl resumer; private ScanResumerImpl resumer;
public ScanControllerImpl(ScanResponse resp) { public ScanControllerImpl(Optional<Cursor> cursor) {
callerThread = Thread.currentThread(); this.callerThread = Thread.currentThread();
cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) this.cursor = cursor;
: Optional.empty();
} }
private void preCheck() { private void preCheck() {
@ -476,10 +476,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
} }
updateServerSideMetrics(scanMetrics, resp); updateServerSideMetrics(scanMetrics, resp);
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
Result[] rawResults;
Result[] results; Result[] results;
int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
try { try {
Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
results = resultCache.addAndGet( results = resultCache.addAndGet(
Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
@ -493,13 +494,31 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return; return;
} }
ScanControllerImpl scanController = new ScanControllerImpl(resp); ScanControllerImpl scanController;
if (results.length > 0) { if (results.length > 0) {
scanController = new ScanControllerImpl(
resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
: Optional.empty());
updateNextStartRowWhenError(results[results.length - 1]); updateNextStartRowWhenError(results[results.length - 1]);
consumer.onNext(results, scanController); consumer.onNext(results, scanController);
} else if (resp.hasHeartbeatMessage() && resp.getHeartbeatMessage()) { } else {
Optional<Cursor> cursor = Optional.empty();
if (resp.hasCursor()) {
cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
} else if (scan.isNeedCursorResult() && rawResults.length > 0) {
// It is size limit exceed and we need to return the last Result's row.
// When user setBatch and the scanner is reopened, the server may return Results that
// user has seen and the last Result can not be seen because the number is not enough.
// So the row keys of results may not be same, we must use the last one.
cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
}
scanController = new ScanControllerImpl(cursor);
if (isHeartbeatMessage || cursor.isPresent()) {
// only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
// want to pass a cursor to upper layer.
consumer.onHeartbeat(scanController); consumer.onHeartbeat(scanController);
} }
}
ScanControllerState state = scanController.destroy(); ScanControllerState state = scanController.destroy();
if (state == ScanControllerState.TERMINATED) { if (state == ScanControllerState.TERMINATED) {
if (resp.getMoreResultsInRegion()) { if (resp.getMoreResultsInRegion()) {

View File

@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -31,6 +28,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
/** /**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@ -46,6 +45,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private final long maxCacheSize; private final long maxCacheSize;
private final Scan scan;
private final Queue<Result> queue = new ArrayDeque<>(); private final Queue<Result> queue = new ArrayDeque<>();
private ScanMetrics scanMetrics; private ScanMetrics scanMetrics;
@ -61,6 +62,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
this.rawTable = table; this.rawTable = table;
this.maxCacheSize = maxCacheSize; this.maxCacheSize = maxCacheSize;
this.scan = scan;
table.scan(scan, this); table.scan(scan, this);
} }
@ -98,6 +100,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
public synchronized void onHeartbeat(ScanController controller) { public synchronized void onHeartbeat(ScanController controller) {
if (closed) { if (closed) {
controller.terminate(); controller.terminate();
return;
}
if (scan.isNeedCursorResult()) {
controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
} }
} }
@ -143,10 +149,12 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
} }
} }
Result result = queue.poll(); Result result = queue.poll();
if (!result.isCursor()) {
cacheSize -= calcEstimatedSize(result); cacheSize -= calcEstimatedSize(result);
if (resumer != null && cacheSize <= maxCacheSize / 2) { if (resumer != null && cacheSize <= maxCacheSize / 2) {
resumePrefetch(); resumePrefetch();
} }
}
return result; return result;
} }

View File

@ -95,8 +95,15 @@ public interface RawScanResultConsumer {
void onNext(Result[] results, ScanController controller); void onNext(Result[] results, ScanController controller);
/** /**
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call * Indicate that there is a heartbeat message but we have not cumulated enough cells to call
* onNext. * {@link #onNext(Result[], ScanController)}.
* <p>
* Note that this method will always be called when RS returns something to us but we do not have
* enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a
* 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is
* exceeded before sending all the cells for this row. For RS it does send some data to us and the
* time limit has not been reached, but we can not return the data to client so here we call this
* method to tell client we have already received something.
* <p> * <p>
* This method give you a chance to terminate a slow scan operation. * This method give you a chance to terminate a slow scan operation.
* @param controller used to suspend or terminate the scan. Notice that the {@code controller} * @param controller used to suspend or terminate the scan. Notice that the {@code controller}

View File

@ -17,31 +17,31 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class }) public abstract class AbstractTestResultScannerCursor extends AbstractTestScanCursor {
public class TestScanCursor extends AbstractTestScanCursor {
protected abstract ResultScanner getScanner(Scan scan) throws Exception;
@Test @Test
public void testHeartbeatWithSparseFilter() throws Exception { public void testHeartbeatWithSparseFilter() throws Exception {
try (ResultScanner scanner = try (ResultScanner scanner = getScanner(createScanWithSparseFilter())) {
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) {
int num = 0; int num = 0;
Result r; Result r;
while ((r = scanner.next()) != null) { while ((r = scanner.next()) != null) {
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
Assert.assertTrue(r.isCursor()); assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
r.getCursor().getRow()); r.getCursor().getRow());
} else { } else {
Assert.assertFalse(r.isCursor()); assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
} }
num++; num++;
} }
@ -50,18 +50,17 @@ public class TestScanCursor extends AbstractTestScanCursor {
@Test @Test
public void testHeartbeatWithSparseFilterReversed() throws Exception { public void testHeartbeatWithSparseFilterReversed() throws Exception {
try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME) try (ResultScanner scanner = getScanner(createReversedScanWithSparseFilter())) {
.getScanner(createReversedScanWithSparseFilter())) {
int num = 0; int num = 0;
Result r; Result r;
while ((r = scanner.next()) != null) { while ((r = scanner.next()) != null) {
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
Assert.assertTrue(r.isCursor()); assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
r.getCursor().getRow()); r.getCursor().getRow());
} else { } else {
Assert.assertFalse(r.isCursor()); assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[0], r.getRow()); assertArrayEquals(ROWS[0], r.getRow());
} }
num++; num++;
} }
@ -76,12 +75,12 @@ public class TestScanCursor extends AbstractTestScanCursor {
Result r; Result r;
while ((r = scanner.next()) != null) { while ((r = scanner.next()) != null) {
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) {
Assert.assertTrue(r.isCursor()); assertTrue(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
r.getCursor().getRow()); r.getCursor().getRow());
} else { } else {
Assert.assertFalse(r.isCursor()); assertFalse(r.isCursor());
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
} }
num++; num++;
} }

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncResultScannerCursor extends AbstractTestResultScannerCursor {
private static AsyncConnection CONN;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
AbstractTestScanCursor.setUpBeforeClass();
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
public static void tearDownAfterClass() throws Exception {
if (CONN != null) {
CONN.close();
}
AbstractTestScanCursor.tearDownAfterClass();
}
@Override
protected ResultScanner getScanner(Scan scan) throws Exception {
return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan);
}
}

View File

@ -27,18 +27,32 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class }) @Category({ MediumTests.class, ClientTests.class })
public class TestRawAsyncScanCursor extends AbstractTestScanCursor { public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
private static AsyncConnection CONN;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
AbstractTestScanCursor.setUpBeforeClass();
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
public static void tearDownAfterClass() throws Exception {
if (CONN != null) {
CONN.close();
}
AbstractTestScanCursor.tearDownAfterClass();
}
private void doTest(boolean reversed) private void doTest(boolean reversed)
throws InterruptedException, ExecutionException, IOException { throws InterruptedException, ExecutionException, IOException {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
try (AsyncConnection conn = RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
RawAsyncTable table = conn.getRawTable(TABLE_NAME);
table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
new RawScanResultConsumer() { new RawScanResultConsumer() {
@ -91,7 +105,6 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
}); });
future.get(); future.get();
} }
}
@Test @Test
public void testHeartbeatWithSparseFilter() public void testHeartbeatWithSparseFilter()
@ -104,4 +117,50 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
doTest(true); doTest(true);
} }
@Test
public void testSizeLimit() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = new CompletableFuture<>();
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
table.scan(createScanWithSizeLimit(), new RawScanResultConsumer() {
private int count;
@Override
public void onHeartbeat(ScanController controller) {
try {
assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS],
controller.cursor().get().getRow());
count++;
} catch (Throwable e) {
future.completeExceptionally(e);
throw e;
}
}
@Override
public void onNext(Result[] results, ScanController controller) {
try {
assertFalse(controller.cursor().isPresent());
assertEquals(1, results.length);
assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow());
count++;
} catch (Throwable e) {
future.completeExceptionally(e);
throw e;
}
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onComplete() {
future.complete(null);
}
});
future.get();
}
} }

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestResultScannerCursor extends AbstractTestResultScannerCursor {
@Override
protected ResultScanner getScanner(Scan scan) throws IOException {
return TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(scan);
}
}