diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 5fd00a5f6ed..48f004c0a29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -188,6 +188,7 @@ class AsyncClientScanner { private CompletableFuture openScanner(int replicaId) { return conn.callerFactory. single().table(tableName) .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .priority(scan.getPriority()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 1fa3c81e5d1..7f19180a0ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; @@ -347,7 +348,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); - resetController(controller, rpcTimeoutNs, priority); + resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index b1047985eb6..bc5ebf4e9ff 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -33,8 +37,13 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; @@ -59,9 +68,7 @@ import org.junit.rules.TestName; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -91,6 +98,8 @@ public class TestAsyncTableRpcPriority { private ClientService.Interface stub; + private ExecutorService threadPool; + private AsyncConnection conn; @Rule @@ -98,34 +107,9 @@ public class TestAsyncTableRpcPriority { @Before public void setUp() throws IOException { + this.threadPool = Executors.newSingleThreadExecutor(); stub = mock(ClientService.Interface.class); - AtomicInteger scanNextCalled = new AtomicInteger(0); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ScanRequest req = invocation.getArgument(1); - RpcCallback done = invocation.getArgument(2); - if (!req.hasScannerId()) { - done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) - .setMoreResultsInRegion(true).setMoreResults(true).build()); - } else { - if (req.hasCloseScanner() && req.getCloseScanner()) { - done.run(ScanResponse.getDefaultInstance()); - } else { - Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) - .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) - .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) - .setValue(Bytes.toBytes("v")).build(); - Result result = Result.create(Arrays.asList(cell)); - done.run( - ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) - .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); - } - } - return null; - } - }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); doAnswer(new Answer() { @Override @@ -218,6 +202,16 @@ public class TestAsyncTableRpcPriority { }); } + private ScanRequest assertScannerCloseRequest() { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(ScanRequest request) { + return request.hasCloseScanner() && request.getCloseScanner(); + } + }); + } + @Test public void testGet() { conn.getTable(TableName.valueOf(name.getMethodName())) @@ -478,53 +472,113 @@ public class TestAsyncTableRpcPriority { any(ClientProtos.MultiRequest.class), any()); } - @Test - public void testScan() throws IOException, InterruptedException { - try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any()); + private CompletableFuture mockScanReturnRenewFuture(int scanPriority) { + int scannerId = 1; + CompletableFuture future = new CompletableFuture<>(); + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() -> { + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run(ScanResponse.newBuilder() + .setScannerId(scannerId).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true) + .build()); + } else { + if (req.hasRenew() && req.getRenew()) { + future.complete(null); + } + + assertFalse("close scanner should not come in with scan priority " + scanPriority, + req.hasCloseScanner() && req.getCloseScanner()); + + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) + .setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder() + .setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true) + .setMoreResults(true).addResults(ProtobufUtil.toResult(result)) + .build()); + } + }); + return null; + } + }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); + + doAnswer(new Answer() { + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + assertTrue("close request should have scannerId", req.hasScannerId()); + assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); + assertTrue("close request should have closerScanner set", + req.hasCloseScanner() && req.getCloseScanner()); + + done.run(ScanResponse.getDefaultInstance()); + }); + return null; + } + }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + return future; } @Test - public void testScanNormalTable() throws IOException, InterruptedException { - try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any()); + public void testScan() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(19); + testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19)); } @Test - public void testScanSystemTable() throws IOException, InterruptedException { - try (ResultScanner scanner = - conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); + public void testScanNormalTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(NORMAL_QOS); + testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS)); } @Test - public void testScanMetaTable() throws IOException, InterruptedException { - try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + public void testScanSystemTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); + testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), + renewFuture, Optional.empty()); + } + + @Test + public void testScanMetaTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); + testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); + } + + private void testForTable(TableName tableName, CompletableFuture renewFuture, + Optional priority) throws Exception { + Scan scan = new Scan().setCaching(1).setMaxResultSize(1); + priority.ifPresent(scan::setPriority); + + try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { assertNotNull(scanner.next()); - Thread.sleep(1000); + // wait for at least one renew to come in before closing + renewFuture.join(); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); + + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then one or more lease renewals, then close + verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any()); + // additionally, explicitly check for a close request + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index c34f8727949..14456eca1b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -129,7 +129,7 @@ public abstract class AbstractTestAsyncTableScan { protected abstract Scan createScan(); - protected abstract List doScan(Scan scan) throws Exception; + protected abstract List doScan(Scan scan, int closeAfter) throws Exception; protected final List convertFromBatchResult(List results) { assertTrue(results.size() % 2 == 0); @@ -145,7 +145,7 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testScanAll() throws Exception { - List results = doScan(createScan()); + List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) .forEach( @@ -169,7 +169,7 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testReversedScanAll() throws Exception { - List results = doScan(createScan().setReversed(true)); + List results = doScan(createScan().setReversed(true), -1); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); } @@ -178,7 +178,7 @@ public abstract class AbstractTestAsyncTableScan { public void testScanNoStopKey() throws Exception { int start = 345; List results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } @@ -187,7 +187,7 @@ public abstract class AbstractTestAsyncTableScan { public void testReverseScanNoStopKey() throws Exception { int start = 765; List results = doScan( - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); } @@ -195,7 +195,7 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testScanWrongColumnFamily() throws Exception { try { - doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily"))); + doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1); } catch (Exception e) { assertTrue(e instanceof NoSuchColumnFamilyException || e.getCause() instanceof NoSuchColumnFamilyException); @@ -203,20 +203,28 @@ public abstract class AbstractTestAsyncTableScan { } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, - int limit) throws Exception { + int limit) throws Exception { + testScan(start, startInclusive, stop, stopInclusive, limit, -1); + } + + private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, + int limit, int closeAfter) throws Exception { Scan scan = createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, closeAfter); int actualStart = startInclusive ? start : start + 1; int actualStop = stopInclusive ? stop + 1 : stop; int count = actualStop - actualStart; if (limit > 0) { count = Math.min(count, limit); } + if (closeAfter > 0) { + count = Math.min(count, closeAfter); + } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); } @@ -229,7 +237,7 @@ public abstract class AbstractTestAsyncTableScan { if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, -1); int actualStart = startInclusive ? start : start - 1; int actualStop = stopInclusive ? stop - 1 : stop; int count = actualStart - actualStop; @@ -309,4 +317,13 @@ public abstract class AbstractTestAsyncTableScan { testReversedScan(765, false, 543, true, 200); testReversedScan(876, false, 654, false, 200); } + + @Test + public void testScanEndingEarly() throws Exception { + testScan(1, true, 998, false, 0, 900); // from first region to last region + testScan(123, true, 234, true, 0, 100); + testScan(234, true, 456, false, 0, 100); + testScan(345, false, 567, true, 0, 100); + testScan(456, false, 678, false, 0, 100); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 42d2c38376d..c1797f3833c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; @@ -29,6 +30,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) @@ -55,16 +57,74 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); - table.scan(scan, consumer); - List results = consumer.getAll(); + List results; + if (closeAfter > 0) { + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } + LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter); + table.scan(scan, consumer); + results = consumer.getAll(); + } else { + SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + table.scan(scan, consumer); + results = consumer.getAll(); + } if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } return results; } + private static class LimitedScanResultConsumer implements ScanResultConsumer { + + private final int limit; + + public LimitedScanResultConsumer(int limit) { + this.limit = limit; + } + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return results.size() < limit; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index d9a53952ab8..96c2d40138c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -60,11 +60,16 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { List results = getTable.get().scanAll(scan).get(); if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } + // we can't really close the scan early for scanAll, but to keep the assertions + // simple in AbstractTestAsyncTableScan we'll just sublist here instead. + if (closeAfter > 0 && closeAfter < results.size()) { + results = results.subList(0, closeAfter); + } return results; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index f832cfd759a..2e990f763da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -62,12 +62,21 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } try (ResultScanner scanner = table.getScanner(scan)) { for (Result result; (result = scanner.next()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } } if (scan.getBatch() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 78a54edd24b..26c201e1986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -55,12 +55,21 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } if (scan.getBatch() > 0) { results = convertFromBatchResult(results);