HBASE-21907 Should set priority for rpc request

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2019-02-18 18:25:30 +08:00 committed by zhangduo
parent 7be71c0f55
commit 6176471957
14 changed files with 1017 additions and 110 deletions

View File

@ -43,10 +43,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private final Callable<T> callable;
private ServerName serverName;
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
@ -45,6 +46,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -129,6 +131,11 @@ class AsyncBatchRpcRetryingCaller<T> {
computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
}
public int getPriority() {
return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
.mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
}
}
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
@ -148,7 +155,12 @@ class AsyncBatchRpcRetryingCaller<T> {
this.action2Future = new IdentityHashMap<>(actions.size());
for (int i = 0, n = actions.size(); i < n; i++) {
Row rawAction = actions.get(i);
Action action = new Action(rawAction, i);
Action action;
if (rawAction instanceof OperationWithAttributes) {
action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
} else {
action = new Action(rawAction, i);
}
if (rawAction instanceof Append || rawAction instanceof Increment) {
action.setNonce(conn.getNonceGenerator().newNonce());
}
@ -341,7 +353,8 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}

View File

@ -42,9 +42,9 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.callable = callable;
}

View File

@ -52,6 +52,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final Timer retryTimer;
private final int priority;
private final long startNs;
private final long pauseNs;
@ -74,10 +76,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
@ -85,6 +89,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
}
@ -113,7 +118,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
} else {
callTimeoutNs = rpcTimeoutNs;
}
resetController(controller, callTimeoutNs);
resetController(controller, callTimeoutNs, priority);
}
private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@ -77,6 +79,8 @@ class AsyncRpcRetryingCallerFactory {
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
private int priority = PRIORITY_UNSET;
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@ -128,12 +132,25 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public AsyncSingleRequestRpcRetryingCaller<T> build() {
public SingleRequestCallerBuilder<T> priority(int priority) {
this.priority = priority;
return this;
}
private void preCheck() {
checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(tableName, "tableName is null");
checkNotNull(row, "row is null");
checkNotNull(locateType, "locateType is null");
checkNotNull(callable, "action is null");
this.priority = calcPriority(priority, tableName);
}
public AsyncSingleRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
/**
@ -175,6 +192,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs;
private int priority = PRIORITY_UNSET;
public ScanSingleRegionCallerBuilder id(long scannerId) {
this.scannerId = scannerId;
return this;
@ -182,6 +201,7 @@ class AsyncRpcRetryingCallerFactory {
public ScanSingleRegionCallerBuilder setScan(Scan scan) {
this.scan = scan;
this.priority = scan.getPriority();
return this;
}
@ -246,14 +266,22 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public AsyncScanSingleRegionRpcRetryingCaller build() {
private void preCheck() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(scan, "scan is null");
checkNotNull(resultCache, "resultCache is null");
checkNotNull(consumer, "consumer is null");
checkNotNull(stub, "stub is null");
checkNotNull(loc, "location is null");
this.priority = calcPriority(priority, loc.getRegion().getTable());
}
public AsyncScanSingleRegionRpcRetryingCaller build() {
preCheck();
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
/**
@ -338,6 +366,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
private int priority = PRIORITY_UNSET;
public MasterRequestCallerBuilder<T> action(
AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
@ -369,10 +399,24 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public MasterRequestCallerBuilder<T> priority(TableName tableName) {
this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
return this;
}
public MasterRequestCallerBuilder<T> priority(int priority) {
this.priority = Math.max(this.priority, priority);
return this;
}
private void preCheck() {
checkNotNull(callable, "action is null");
}
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
preCheck();
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -398,6 +442,8 @@ class AsyncRpcRetryingCallerFactory {
private ServerName serverName;
private int priority;
public AdminRequestCallerBuilder<T> action(
AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
@ -434,9 +480,14 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public AdminRequestCallerBuilder<T> priority(int priority) {
this.priority = priority;
return this;
}
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}

View File

@ -91,6 +91,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final boolean regionServerRemote;
private final int priority;
private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
@ -298,11 +300,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scanMetrics = scanMetrics;
@ -324,7 +326,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
}
this.future = new CompletableFuture<>();
this.priority = priority;
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
}
@ -338,7 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs);
resetController(controller, rpcTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
@ -558,7 +562,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
if (tries > 1) {
incRPCRetriesMetrics(scanMetrics, regionServerRemote);
}
resetController(controller, callTimeoutNs);
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, false, false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
@ -575,7 +579,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void renewLease() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
nextCallSeq++;
resetController(controller, rpcTimeoutNs);
resetController(controller, rpcTimeoutNs, priority);
ScanRequest req =
RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
stub.scan(controller, req, resp -> {

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@ -47,8 +48,8 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}

View File

@ -56,9 +56,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
Callable<T> callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.tableName = tableName;
this.row = row;

View File

@ -298,12 +298,13 @@ public final class ConnectionUtils {
return Bytes.equals(row, EMPTY_END_ROW);
}
static void resetController(HBaseRpcController controller, long timeoutNs) {
static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
controller.reset();
if (timeoutNs >= 0) {
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
}
controller.setPriority(priority);
}
static Throwable translateException(Throwable t) {
@ -588,4 +589,35 @@ public final class ConnectionUtils {
}
}
}
/**
* Select the priority for the rpc call.
* <p/>
* The rules are:
* <ol>
* <li>If user set a priority explicitly, then just use it.</li>
* <li>For meta table, use {@link HConstants#META_QOS}.</li>
* <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
* </ol>
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
* @param tableName the table we operate on
*/
static int calcPriority(int priority, TableName tableName) {
if (priority != HConstants.PRIORITY_UNSET) {
return priority;
} else {
return getPriority(tableName);
}
}
static int getPriority(TableName tableName) {
if (TableName.isMetaTableName(tableName)) {
return HConstants.META_QOS;
} else if (tableName.isSystemTable()) {
return HConstants.SYSTEMTABLE_QOS;
} else {
return HConstants.NORMAL_QOS;
}
}
}

View File

@ -339,4 +339,9 @@ public class Put extends Mutation implements HeapSize {
public Put setTTL(long ttl) {
return (Put) super.setTTL(ttl);
}
@Override
public Put setPriority(int priority) {
return (Put) super.setPriority(priority);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@ -393,7 +395,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@ -416,9 +417,24 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
CompletableFuture<Long> procFuture =
this.<Long> newMasterCaller().action((controller, stub) -> this
.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
return procedureCall(b -> {
}, preq, rpcCall, respConverter, consumer);
}
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
}
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
prioritySetter.accept(builder);
CompletableFuture<Long> procFuture = builder.call();
CompletableFuture<Void> future = waitProcedureResult(procFuture);
addListener(future, consumer);
return future;
@ -515,7 +531,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
addListener(this.<List<TableSchema>> newMasterCaller()
addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
.action((controller, stub) -> this
.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
@ -566,14 +582,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
Preconditions.checkNotNull(tableName, "table name is null");
return this.<CreateTableRequest, CreateTableResponse> procedureCall(request,
return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
(s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
new CreateTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
(resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
@ -581,15 +597,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
new DeleteTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
(resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
@ -597,16 +613,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> enableTable(TableName tableName) {
return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
new EnableTableProcedureBiConsumer(tableName));
}
@Override
public CompletableFuture<Void> disableTable(TableName tableName) {
return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
new DisableTableProcedureBiConsumer(tableName));
}
@ -725,7 +741,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
return this.<AddColumnRequest, AddColumnResponse> procedureCall(
return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
new AddColumnFamilyProcedureBiConsumer(tableName));
@ -733,7 +749,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
(resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
@ -742,7 +758,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
ColumnFamilyDescriptor columnFamily) {
return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
(resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
@ -1226,9 +1242,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
new MergeTableRegionProcedureBiConsumer(tableName)),
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName,
request, (s, c, req, done) -> s.mergeTableRegions(c, req, done),
(resp) -> resp.getProcId(), new MergeTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
@ -1403,9 +1419,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
(s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
addListener(
this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
new SplitTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@ -1423,7 +1441,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(err);
return;
}
addListener(this.<Void> newMasterCaller()
addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
(s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
@ -1447,7 +1465,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
this.<Void> newMasterCaller()
this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this
.<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
@ -1473,7 +1491,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
this.<Void> newMasterCaller()
this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(((controller, stub) -> this
.<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
@ -1499,7 +1517,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return;
}
addListener(
moveRegion(
moveRegion(regionInfo,
RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
(ret, err2) -> {
if (err2 != null) {
@ -1522,7 +1540,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(err);
return;
}
addListener(moveRegion(RequestConverter
addListener(
moveRegion(regionInfo, RequestConverter
.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
(ret, err2) -> {
if (err2 != null) {
@ -1535,12 +1554,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
private CompletableFuture<Void> moveRegion(MoveRegionRequest request) {
return this
.<Void> newMasterCaller()
private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
return this.<Void> newMasterCaller().priority(regionInfo.getTable())
.action(
(controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call();
stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
.call();
}
@Override
@ -2704,35 +2723,31 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> shutdown() {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
stub, ShutdownRequest.newBuilder().build(),
(s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
return this.<Void> newMasterCaller().priority(HIGH_QOS)
.action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
resp -> null))
.call();
}
@Override
public CompletableFuture<Void> stopMaster() {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
stub, StopMasterRequest.newBuilder().build(),
(s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
return this.<Void> newMasterCaller().priority(HIGH_QOS)
.action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
controller, stub, StopMasterRequest.newBuilder().build(),
(s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
.call();
}
@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
StopServerRequest request =
RequestConverter.buildStopServerRequest("Called by admin client "
+ this.connection.toString());
return this
.<Void> newAdminCaller()
.action(
(controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
StopServerRequest request = RequestConverter
.buildStopServerRequest("Called by admin client " + this.connection.toString());
return this.<Void> newAdminCaller().priority(HIGH_QOS)
.action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
resp -> null)).serverName(serverName).call();
resp -> null))
.serverName(serverName).call();
}
@Override

View File

@ -206,20 +206,21 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return newCaller(row.getRow(), rpcTimeoutNs);
private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
R row, long rpcTimeoutNs) {
return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
}
private CompletableFuture<Result> get(Get get, int replicaId) {
return this.<Result> newCaller(get, readRpcTimeoutNs)
return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@ -237,7 +238,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
return this.<Void> newCaller(put, writeRpcTimeoutNs)
return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call();
@ -245,7 +246,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> delete(Delete delete) {
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call();
@ -256,7 +257,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
checkHasFamilies(append);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(append, rpcTimeoutNs)
return this.<Result, Append> newCaller(append, rpcTimeoutNs)
.action(
(controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
@ -268,7 +269,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
checkHasFamilies(increment);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(increment, rpcTimeoutNs)
return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
@ -330,7 +331,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -342,7 +343,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -354,7 +355,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -412,8 +414,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub,
mutation, (rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.HConstants.META_QOS;
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
/**
* Confirm that we will set the priority in {@link HBaseRpcController} for several admin operations.
*/
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncAdminRpcPriority {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncAdminRpcPriority.class);
private static Configuration CONF = HBaseConfiguration.create();
private MasterService.Interface masterStub;
private AdminService.Interface adminStub;
private AsyncConnection conn;
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws IOException {
masterStub = mock(MasterService.Interface.class);
adminStub = mock(AdminService.Interface.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<GetProcedureResultResponse> done = invocation.getArgument(2);
done.run(GetProcedureResultResponse.newBuilder()
.setState(GetProcedureResultResponse.State.FINISHED).build());
return null;
}
}).when(masterStub).getProcedureResult(any(HBaseRpcController.class),
any(GetProcedureResultRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<CreateTableResponse> done = invocation.getArgument(2);
done.run(CreateTableResponse.newBuilder().setProcId(1L).build());
return null;
}
}).when(masterStub).createTable(any(HBaseRpcController.class), any(CreateTableRequest.class),
any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<ShutdownResponse> done = invocation.getArgument(2);
done.run(ShutdownResponse.getDefaultInstance());
return null;
}
}).when(masterStub).shutdown(any(HBaseRpcController.class), any(ShutdownRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<StopMasterResponse> done = invocation.getArgument(2);
done.run(StopMasterResponse.getDefaultInstance());
return null;
}
}).when(masterStub).stopMaster(any(HBaseRpcController.class), any(StopMasterRequest.class),
any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<StopServerResponse> done = invocation.getArgument(2);
done.run(StopServerResponse.getDefaultInstance());
return null;
}
}).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class),
any());
conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
UserProvider.instantiate(CONF).getCurrent()) {
@Override
CompletableFuture<MasterService.Interface> getMasterStub() {
return CompletableFuture.completedFuture(masterStub);
}
@Override
Interface getAdminStub(ServerName serverName) throws IOException {
return adminStub;
}
};
}
private HBaseRpcController assertPriority(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {
@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
}
});
}
@Test
public void testCreateNormalTable() {
conn.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
.join();
verify(masterStub, times(1)).createTable(assertPriority(NORMAL_QOS),
any(CreateTableRequest.class), any());
}
// a bit strange as we can not do this in production but anyway, just a client mock to confirm
// that we pass the correct priority
@Test
public void testCreateSystemTable() {
conn.getAdmin()
.createTable(TableDescriptorBuilder
.newBuilder(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build())
.join();
verify(masterStub, times(1)).createTable(assertPriority(SYSTEMTABLE_QOS),
any(CreateTableRequest.class), any());
}
// a bit strange as we can not do this in production but anyway, just a client mock to confirm
// that we pass the correct priority
@Test
public void testCreateMetaTable() {
conn.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()).join();
verify(masterStub, times(1)).createTable(assertPriority(META_QOS),
any(CreateTableRequest.class), any());
}
@Test
public void testShutdown() {
conn.getAdmin().shutdown().join();
verify(masterStub, times(1)).shutdown(assertPriority(HIGH_QOS), any(ShutdownRequest.class),
any());
}
@Test
public void testStopMaster() {
conn.getAdmin().stopMaster().join();
verify(masterStub, times(1)).stopMaster(assertPriority(HIGH_QOS), any(StopMasterRequest.class),
any());
}
@Test
public void testStopRegionServer() {
conn.getAdmin().stopRegionServer(ServerName.valueOf("rs", 16010, 12345)).join();
verify(adminStub, times(1)).stopServer(assertPriority(HIGH_QOS), any(StopServerRequest.class),
any());
}
}

View File

@ -0,0 +1,554 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_QOS;
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
/**
* Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
*/
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncTableRpcPriority {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class);
private static Configuration CONF = HBaseConfiguration.create();
private ClientService.Interface stub;
private AsyncConnection conn;
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws IOException {
stub = mock(ClientService.Interface.class);
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
}
}
return null;
}
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ClientProtos.MultiResponse resp =
ClientProtos.MultiResponse.newBuilder()
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
.build();
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
done.run(resp);
return null;
}
}).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
MutateResponse resp;
switch (req.getMutateType()) {
case INCREMENT:
ColumnValue value = req.getColumnValue(0);
QualifierValue qvalue = value.getQualifierValue(0);
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
.setQualifier(qvalue.getQualifier().toByteArray())
.setValue(qvalue.getValue().toByteArray()).build();
resp = MutateResponse.newBuilder()
.setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
break;
default:
resp = MutateResponse.getDefaultInstance();
break;
}
RpcCallback<MutateResponse> done = invocation.getArgument(2);
done.run(resp);
return null;
}
}).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RpcCallback<GetResponse> done = invocation.getArgument(2);
done.run(GetResponse.getDefaultInstance());
return null;
}
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test",
UserProvider.instantiate(CONF).getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
Answer<CompletableFuture<HRegionLocation>> answer =
new Answer<CompletableFuture<HRegionLocation>>() {
@Override
public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
throws Throwable {
TableName tableName = invocation.getArgument(0);
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
HRegionLocation loc = new HRegionLocation(info, serverName);
return CompletableFuture.completedFuture(loc);
}
};
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
any(RegionLocateType.class), anyLong());
doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
anyInt(), any(RegionLocateType.class), anyLong());
return locator;
}
@Override
ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
return stub;
}
};
}
private HBaseRpcController assertPriority(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {
@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
}
});
}
@Test
public void testGet() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
}
@Test
public void testGetNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
}
@Test
public void testGetSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.get(new Get(Bytes.toBytes(0))).join();
verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
}
@Test
public void testGetMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
verify(stub, times(1)).get(assertPriority(META_QOS), any(GetRequest.class), any());
}
@Test
public void testPut() {
conn
.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
.setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
}
@Test
public void testPutNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testPutSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testPutMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testDelete() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join();
verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
}
@Test
public void testDeleteNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0)))
.join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testDeleteSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.delete(new Delete(Bytes.toBytes(0))).join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testDeleteMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testAppend() {
conn
.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
.setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
}
@Test
public void testAppendNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testAppendSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testAppendMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testIncrement() {
conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
}
@Test
public void testIncrementNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testIncrementSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testIncrementMetaTable() {
conn.getTable(TableName.META_TABLE_NAME)
.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndPut() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifNotExists()
.thenPut(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
.join();
verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
}
@Test
public void testCheckAndPutNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndPutSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndPutMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndDelete() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
}
@Test
public void testCheckAndDeleteNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndDeleteSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndDeleteMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.join();
verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any());
}
@Test
public void testCheckAndMutate() throws IOException {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0))
.add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
.join();
verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
}
@Test
public void testCheckAndMutateNormalTable() throws IOException {
conn.getTable(TableName.valueOf(name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"))
.thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
.join();
verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
any());
}
@Test
public void testCheckAndMutateSystemTable() throws IOException {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"))
.thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
.join();
verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
any(ClientProtos.MultiRequest.class), any());
}
@Test
public void testCheckAndMutateMetaTable() throws IOException {
conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
.thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
.join();
verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
any());
}
@Test
public void testScan() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
}
@Test
public void testScanNormalTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
}
@Test
public void testScanSystemTable() throws IOException, InterruptedException {
try (ResultScanner scanner =
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
}
@Test
public void testScanMetaTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(META_QOS), any(ScanRequest.class), any());
}
@Test
public void testBatchNormalTable() {
conn.getTable(TableName.valueOf(name.getMethodName()))
.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
any());
}
@Test
public void testBatchSystemTable() {
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
any(ClientProtos.MultiRequest.class), any());
}
@Test
public void testBatchMetaTable() {
conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
.join();
verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class),
any());
}
}