HBASE-26784 Use HIGH_QOS for ResultScanner.close requests (#4146)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-03-07 21:48:57 -05:00 committed by GitHub
parent bcd9a9acef
commit 39ecaa1975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 237 additions and 81 deletions

View File

@ -188,6 +188,7 @@ class AsyncClientScanner {
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)

View File

@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
@ -347,7 +348,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, priority);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {

View File

@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@ -33,8 +37,13 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
@ -59,9 +68,7 @@ import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -91,6 +98,8 @@ public class TestAsyncTableRpcPriority {
private ClientService.Interface stub;
private ExecutorService threadPool;
private AsyncConnection conn;
@Rule
@ -98,34 +107,9 @@ public class TestAsyncTableRpcPriority {
@Before
public void setUp() throws IOException {
this.threadPool = Executors.newSingleThreadExecutor();
stub = mock(ClientService.Interface.class);
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
}
}
return null;
}
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
@ -218,6 +202,16 @@ public class TestAsyncTableRpcPriority {
});
}
private ScanRequest assertScannerCloseRequest() {
return argThat(new ArgumentMatcher<ScanRequest>() {
@Override
public boolean matches(ScanRequest request) {
return request.hasCloseScanner() && request.getCloseScanner();
}
});
}
@Test
public void testGet() {
conn.getTable(TableName.valueOf(name.getMethodName()))
@ -478,53 +472,113 @@ public class TestAsyncTableRpcPriority {
any(ClientProtos.MultiRequest.class), any());
}
@Test
public void testScan() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
int scannerId = 1;
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() -> {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder()
.setScannerId(scannerId).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true)
.build());
} else {
if (req.hasRenew() && req.getRenew()) {
future.complete(null);
}
assertFalse("close scanner should not come in with scan priority " + scanPriority,
req.hasCloseScanner() && req.getCloseScanner());
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder()
.setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result))
.build());
}
});
return null;
}
}).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() ->{
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
assertTrue("close request should have scannerId", req.hasScannerId());
assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
assertTrue("close request should have closerScanner set",
req.hasCloseScanner() && req.getCloseScanner());
done.run(ScanResponse.getDefaultInstance());
});
return null;
}
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
return future;
}
@Test
public void testScanNormalTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
public void testScan() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19));
}
@Test
public void testScanSystemTable() throws IOException, InterruptedException {
try (ResultScanner scanner =
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
public void testScanNormalTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS));
}
@Test
public void testScanMetaTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
public void testScanSystemTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()),
renewFuture, Optional.empty());
}
@Test
public void testScanMetaTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
}
private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
Optional<Integer> priority) throws Exception {
Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
priority.ifPresent(scan::setPriority);
try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
assertNotNull(scanner.next());
Thread.sleep(1000);
// wait for at least one renew to come in before closing
renewFuture.join();
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);
// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then one or more lease renewals, then close
verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
// additionally, explicitly check for a close request
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}
@Test

View File

@ -129,7 +129,7 @@ public abstract class AbstractTestAsyncTableScan {
protected abstract Scan createScan();
protected abstract List<Result> doScan(Scan scan) throws Exception;
protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
@ -145,7 +145,7 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan());
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(
@ -169,7 +169,7 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testReversedScanAll() throws Exception {
List<Result> results = doScan(createScan().setReversed(true));
List<Result> results = doScan(createScan().setReversed(true), -1);
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
}
@ -178,7 +178,7 @@ public abstract class AbstractTestAsyncTableScan {
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results =
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))));
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1);
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
}
@ -187,7 +187,7 @@ public abstract class AbstractTestAsyncTableScan {
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
List<Result> results = doScan(
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1);
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
}
@ -195,7 +195,7 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testScanWrongColumnFamily() throws Exception {
try {
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")));
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1);
} catch (Exception e) {
assertTrue(e instanceof NoSuchColumnFamilyException ||
e.getCause() instanceof NoSuchColumnFamilyException);
@ -203,20 +203,28 @@ public abstract class AbstractTestAsyncTableScan {
}
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit) throws Exception {
int limit) throws Exception {
testScan(start, startInclusive, stop, stopInclusive, limit, -1);
}
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit, int closeAfter) throws Exception {
Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, closeAfter);
int actualStart = startInclusive ? start : start + 1;
int actualStop = stopInclusive ? stop + 1 : stop;
int count = actualStop - actualStart;
if (limit > 0) {
count = Math.min(count, limit);
}
if (closeAfter > 0) {
count = Math.min(count, closeAfter);
}
assertEquals(count, results.size());
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
}
@ -229,7 +237,7 @@ public abstract class AbstractTestAsyncTableScan {
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, -1);
int actualStart = startInclusive ? start : start - 1;
int actualStop = stopInclusive ? stop - 1 : stop;
int count = actualStart - actualStop;
@ -309,4 +317,13 @@ public abstract class AbstractTestAsyncTableScan {
testReversedScan(765, false, 543, true, 200);
testReversedScan(876, false, 654, false, 200);
}
@Test
public void testScanEndingEarly() throws Exception {
testScan(1, true, 998, false, 0, 900); // from first region to last region
testScan(123, true, 234, true, 0, 100);
testScan(234, true, 456, false, 0, 100);
testScan(345, false, 567, true, 0, 100);
testScan(456, false, 678, false, 0, 100);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
@ -29,6 +30,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
@ -55,16 +57,74 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
table.scan(scan, consumer);
List<Result> results = consumer.getAll();
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter);
table.scan(scan, consumer);
results = consumer.getAll();
} else {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
table.scan(scan, consumer);
results = consumer.getAll();
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
return results;
}
private static class LimitedScanResultConsumer implements ScanResultConsumer {
private final int limit;
public LimitedScanResultConsumer(int limit) {
this.limit = limit;
}
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return results.size() < limit;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
}
}

View File

@ -60,11 +60,16 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
List<Result> results = getTable.get().scanAll(scan).get();
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
// we can't really close the scan early for scanAll, but to keep the assertions
// simple in AbstractTestAsyncTableScan we'll just sublist here instead.
if (closeAfter > 0 && closeAfter < results.size()) {
results = results.subList(0, closeAfter);
}
return results;
}
}

View File

@ -62,12 +62,21 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (closeAfter > 0 && scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result; (result = scanner.next()) != null;) {
results.add(result);
if (closeAfter > 0 && results.size() >= closeAfter) {
break;
}
}
}
if (scan.getBatch() > 0) {

View File

@ -55,12 +55,21 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer();
ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (closeAfter > 0 && scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result);
if (closeAfter > 0 && results.size() >= closeAfter) {
break;
}
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);