HBASE-26784 Addendum: Close scanner request should properly inherit original timeout and priority (#4183)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
6facc82585
commit
b1a7c6210f
|
@ -312,13 +312,19 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
|
||||||
ScanRequest request =
|
ScanRequest request =
|
||||||
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
|
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
|
||||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||||
// pull in the original priority, but then try to set to HIGH.
|
|
||||||
// it will take whatever is highest.
|
// Set fields from the original controller onto the close-specific controller
|
||||||
controller.setPriority(controller.getPriority());
|
// We set the timeout and the priority -- we will overwrite the priority to HIGH
|
||||||
controller.setPriority(HConstants.HIGH_QOS);
|
// below, but the controller will take whichever is highest.
|
||||||
if (controller.hasCallTimeout()) {
|
if (getRpcController() instanceof HBaseRpcController) {
|
||||||
controller.setCallTimeout(controller.getCallTimeout());
|
HBaseRpcController original = (HBaseRpcController) getRpcController();
|
||||||
|
controller.setPriority(original.getPriority());
|
||||||
|
if (original.hasCallTimeout()) {
|
||||||
|
controller.setCallTimeout(original.getCallTimeout());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
controller.setPriority(HConstants.HIGH_QOS);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
getStub().scan(controller, request);
|
getStub().scan(controller, request);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -262,4 +262,11 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
|
||||||
action.run(false);
|
action.run(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public String toString() {
|
||||||
|
return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done
|
||||||
|
+ ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception="
|
||||||
|
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
|
||||||
|
+ cellScanner + '}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,14 +32,12 @@ import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
@ -74,6 +72,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
*/
|
*/
|
||||||
@Category({ ClientTests.class, MediumTests.class })
|
@Category({ ClientTests.class, MediumTests.class })
|
||||||
public class TestTableRpcPriority {
|
public class TestTableRpcPriority {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestTableRpcPriority.class);
|
HBaseClassTestRule.forClass(TestTableRpcPriority.class);
|
||||||
|
@ -89,6 +88,7 @@ public class TestTableRpcPriority {
|
||||||
stub = mock(ClientProtos.ClientService.BlockingInterface.class);
|
stub = mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||||
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
conn = new ConnectionImplementation(conf, executorService,
|
conn = new ConnectionImplementation(conf, executorService,
|
||||||
UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
|
UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
|
||||||
|
@ -122,6 +122,16 @@ public class TestTableRpcPriority {
|
||||||
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
|
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test verifies that our closeScanner request honors the original
|
||||||
|
* priority of the scan if it's greater than our expected HIGH_QOS for close calls.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testScanSuperHighPriority() throws Exception {
|
||||||
|
mockScan(1000);
|
||||||
|
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScanNormalTable() throws Exception {
|
public void testScanNormalTable() throws Exception {
|
||||||
mockScan(NORMAL_QOS);
|
mockScan(NORMAL_QOS);
|
||||||
|
@ -153,11 +163,22 @@ public class TestTableRpcPriority {
|
||||||
// just verify that the calls happened. verification of priority occurred in the mocking
|
// just verify that the calls happened. verification of priority occurred in the mocking
|
||||||
// open, next, then several renew lease
|
// open, next, then several renew lease
|
||||||
verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
|
verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
|
||||||
verify(stub, times(1)).scan(any(), assertScannerCloseRequest());
|
verify(stub, times(1)).scan(
|
||||||
|
assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), assertScannerCloseRequest());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockScan(int scanPriority) throws ServiceException {
|
private void mockScan(int scanPriority) throws ServiceException {
|
||||||
int scannerId = 1;
|
int scannerId = 1;
|
||||||
|
|
||||||
|
doAnswer(new Answer<ClientProtos.ScanResponse>() {
|
||||||
|
@Override public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Call not covered by explicit mock for arguments controller="
|
||||||
|
+ invocation.getArgument(0) + ", request=" + invocation.getArgument(1));
|
||||||
|
}
|
||||||
|
}).when(stub).scan(any(), any());
|
||||||
|
|
||||||
AtomicInteger scanNextCalled = new AtomicInteger(0);
|
AtomicInteger scanNextCalled = new AtomicInteger(0);
|
||||||
doAnswer(new Answer<ClientProtos.ScanResponse>() {
|
doAnswer(new Answer<ClientProtos.ScanResponse>() {
|
||||||
|
|
||||||
|
@ -182,7 +203,7 @@ public class TestTableRpcPriority {
|
||||||
return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
|
return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
|
||||||
.addResults(ProtobufUtil.toResult(result)).build();
|
.addResults(ProtobufUtil.toResult(result)).build();
|
||||||
}
|
}
|
||||||
}).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class));
|
}).when(stub).scan(assertControllerArgs(scanPriority), any());
|
||||||
|
|
||||||
doAnswer(new Answer<ClientProtos.ScanResponse>() {
|
doAnswer(new Answer<ClientProtos.ScanResponse>() {
|
||||||
|
|
||||||
|
@ -197,15 +218,19 @@ public class TestTableRpcPriority {
|
||||||
|
|
||||||
return ClientProtos.ScanResponse.getDefaultInstance();
|
return ClientProtos.ScanResponse.getDefaultInstance();
|
||||||
}
|
}
|
||||||
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest());
|
}).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
|
||||||
|
assertScannerCloseRequest());
|
||||||
}
|
}
|
||||||
|
|
||||||
private HBaseRpcController assertPriority(int priority) {
|
private HBaseRpcController assertControllerArgs(int priority) {
|
||||||
return argThat(new ArgumentMatcher<HBaseRpcController>() {
|
return argThat(new ArgumentMatcher<HBaseRpcController>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean matches(HBaseRpcController controller) {
|
public boolean matches(HBaseRpcController controller) {
|
||||||
return controller.getPriority() == priority;
|
// check specified priority, but also check that it has a timeout
|
||||||
|
// this ensures that our conversion from the base controller to the close-specific
|
||||||
|
// controller honored the original arguments.
|
||||||
|
return controller.getPriority() == priority && controller.hasCallTimeout();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue