HBASE-21159 Add shell command to switch throttle on or off
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
2bd1b28d5e
commit
c28e03e5df
|
@ -2757,4 +2757,17 @@ public interface Admin extends Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
void cloneTableSchema(final TableName tableName, final TableName newTableName,
|
void cloneTableSchema(final TableName tableName, final TableName newTableName,
|
||||||
final boolean preserveSplits) throws IOException;
|
final boolean preserveSplits) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch the rpc throttle enable state.
|
||||||
|
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||||
|
* @return Previous rpc throttle enabled value
|
||||||
|
*/
|
||||||
|
boolean switchRpcThrottle(final boolean enable) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get if the rpc throttle is enabled.
|
||||||
|
* @return True if rpc throttle is enabled
|
||||||
|
*/
|
||||||
|
boolean isRpcThrottleEnabled() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1256,4 +1256,17 @@ public interface AsyncAdmin {
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
|
CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
|
||||||
List<String> serverNamesList);
|
List<String> serverNamesList);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch the rpc throttle enabled state.
|
||||||
|
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||||
|
* @return Previous rpc throttle enabled value
|
||||||
|
*/
|
||||||
|
CompletableFuture<Boolean> switchRpcThrottle(boolean enable);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get if the rpc throttle is enabled.
|
||||||
|
* @return True if rpc throttle is enabled
|
||||||
|
*/
|
||||||
|
CompletableFuture<Boolean> isRpcThrottleEnabled();
|
||||||
}
|
}
|
||||||
|
|
|
@ -758,4 +758,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
List<String> serverNamesList) {
|
List<String> serverNamesList) {
|
||||||
return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList));
|
return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
|
||||||
|
return wrap(rawAdmin.switchRpcThrottle(enable));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
||||||
|
return wrap(rawAdmin.isRpcThrottleEnabled());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,6 +101,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancer
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
|
||||||
|
@ -111,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
||||||
|
@ -1748,6 +1752,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
MasterProtos.ClearDeadServersRequest request) throws ServiceException {
|
MasterProtos.ClearDeadServersRequest request) throws ServiceException {
|
||||||
return stub.clearDeadServers(controller, request);
|
return stub.clearDeadServers(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||||
|
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||||
|
return stub.switchRpcThrottle(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||||
|
IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||||
|
return stub.isRpcThrottleEnabled(controller, request);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMainte
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
||||||
|
@ -4315,4 +4316,28 @@ public class HBaseAdmin implements Admin {
|
||||||
createTable(htd);
|
createTable(htd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean switchRpcThrottle(final boolean enable) throws IOException {
|
||||||
|
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected Boolean rpcCall() throws Exception {
|
||||||
|
return this.master
|
||||||
|
.switchRpcThrottle(getRpcController(), MasterProtos.SwitchRpcThrottleRequest
|
||||||
|
.newBuilder().setRpcThrottleEnabled(enable).build())
|
||||||
|
.getPreviousRpcThrottleEnabled();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRpcThrottleEnabled() throws IOException {
|
||||||
|
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected Boolean rpcCall() throws Exception {
|
||||||
|
return this.master.isRpcThrottleEnabled(getRpcController(),
|
||||||
|
IsRpcThrottleEnabledRequest.newBuilder().build()).getRpcThrottleEnabled();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,6 +185,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||||
|
@ -243,6 +245,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
|
@ -3597,4 +3601,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
|
resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
|
||||||
.serverName(serverName).call();
|
.serverName(serverName).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
|
||||||
|
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
|
||||||
|
.action((controller, stub) -> this
|
||||||
|
.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
|
||||||
|
SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
|
||||||
|
(s, c, req, done) -> s.switchRpcThrottle(c, req, done),
|
||||||
|
resp -> resp.getPreviousRpcThrottleEnabled()))
|
||||||
|
.call();
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
||||||
|
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
|
||||||
|
.action((controller, stub) -> this
|
||||||
|
.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
|
||||||
|
stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
|
||||||
|
(s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
|
||||||
|
resp -> resp.getRpcThrottleEnabled()))
|
||||||
|
.call();
|
||||||
|
return future;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||||
|
@ -146,6 +148,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
|
@ -638,4 +642,16 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
return stub.splitRegion(controller, request);
|
return stub.splitRegion(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||||
|
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||||
|
return stub.switchRpcThrottle(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||||
|
IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||||
|
return stub.isRpcThrottleEnabled(controller, request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -635,6 +635,21 @@ message ClearDeadServersResponse {
|
||||||
repeated ServerName server_name = 1;
|
repeated ServerName server_name = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SwitchRpcThrottleRequest {
|
||||||
|
required bool rpc_throttle_enabled = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SwitchRpcThrottleResponse {
|
||||||
|
required bool previous_rpc_throttle_enabled = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message IsRpcThrottleEnabledRequest {
|
||||||
|
}
|
||||||
|
|
||||||
|
message IsRpcThrottleEnabledResponse {
|
||||||
|
required bool rpc_throttle_enabled = 1;
|
||||||
|
}
|
||||||
|
|
||||||
service MasterService {
|
service MasterService {
|
||||||
/** Used by the client to get the number of regions that have received the updated schema */
|
/** Used by the client to get the number of regions that have received the updated schema */
|
||||||
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
|
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
|
||||||
|
@ -988,6 +1003,12 @@ service MasterService {
|
||||||
rpc ClearDeadServers(ClearDeadServersRequest)
|
rpc ClearDeadServers(ClearDeadServersRequest)
|
||||||
returns(ClearDeadServersResponse);
|
returns(ClearDeadServersResponse);
|
||||||
|
|
||||||
|
/** Turn the quota throttle on or off */
|
||||||
|
rpc SwitchRpcThrottle (SwitchRpcThrottleRequest) returns (SwitchRpcThrottleResponse);
|
||||||
|
|
||||||
|
/** Get if is rpc throttled enabled */
|
||||||
|
rpc IsRpcThrottleEnabled (IsRpcThrottleEnabledRequest)
|
||||||
|
returns (IsRpcThrottleEnabledResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// HBCK Service definitions.
|
// HBCK Service definitions.
|
||||||
|
|
|
@ -488,3 +488,18 @@ message OpenRegionProcedureStateData {
|
||||||
message CloseRegionProcedureStateData {
|
message CloseRegionProcedureStateData {
|
||||||
optional ServerName assign_candidate = 1;
|
optional ServerName assign_candidate = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum SwitchRpcThrottleState {
|
||||||
|
UPDATE_SWITCH_RPC_THROTTLE_STORAGE = 1;
|
||||||
|
SWITCH_RPC_THROTTLE_ON_RS = 2;
|
||||||
|
POST_SWITCH_RPC_THROTTLE = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SwitchRpcThrottleStateData {
|
||||||
|
required bool rpc_throttle_enabled = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SwitchRpcThrottleRemoteStateData {
|
||||||
|
required ServerName target_server = 1;
|
||||||
|
required bool rpc_throttle_enabled = 2;
|
||||||
|
}
|
|
@ -1432,4 +1432,40 @@ public interface MasterObserver {
|
||||||
*/
|
*/
|
||||||
default void postRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
default void postRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||||
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
|
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before switching rpc throttle enabled state.
|
||||||
|
* @param ctx the coprocessor instance's environment
|
||||||
|
* @param enable the rpc throttle value
|
||||||
|
*/
|
||||||
|
default void preSwitchRpcThrottle(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||||
|
final boolean enable) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after switching rpc throttle enabled state.
|
||||||
|
* @param ctx the coprocessor instance's environment
|
||||||
|
* @param oldValue the previously rpc throttle value
|
||||||
|
* @param newValue the newly rpc throttle value
|
||||||
|
*/
|
||||||
|
default void postSwitchRpcThrottle(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||||
|
final boolean oldValue, final boolean newValue) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before getting if is rpc throttle enabled.
|
||||||
|
* @param ctx the coprocessor instance's environment
|
||||||
|
*/
|
||||||
|
default void preIsRpcThrottleEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after getting if is rpc throttle enabled.
|
||||||
|
* @param ctx the coprocessor instance's environment
|
||||||
|
* @param rpcThrottleEnabled the rpc throttle enabled value
|
||||||
|
*/
|
||||||
|
default void postIsRpcThrottleEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||||
|
final boolean rpcThrottleEnabled) throws IOException {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,12 @@ public enum EventType {
|
||||||
* Master asking RS to open a priority region.
|
* Master asking RS to open a priority region.
|
||||||
*/
|
*/
|
||||||
M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION),
|
M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION),
|
||||||
|
/**
|
||||||
|
* Messages originating from Master to RS.<br>
|
||||||
|
* M_RS_SWITCH_RPC_THROTTLE<br>
|
||||||
|
* Master asking RS to switch rpc throttle state.
|
||||||
|
*/
|
||||||
|
M_RS_SWITCH_RPC_THROTTLE(27, ExecutorType.RS_SWITCH_RPC_THROTTLE),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Messages originating from Client to Master.<br>
|
* Messages originating from Client to Master.<br>
|
||||||
|
|
|
@ -47,7 +47,8 @@ public enum ExecutorType {
|
||||||
RS_REGION_REPLICA_FLUSH_OPS (28),
|
RS_REGION_REPLICA_FLUSH_OPS (28),
|
||||||
RS_COMPACTED_FILES_DISCHARGER (29),
|
RS_COMPACTED_FILES_DISCHARGER (29),
|
||||||
RS_OPEN_PRIORITY_REGION (30),
|
RS_OPEN_PRIORITY_REGION (30),
|
||||||
RS_REFRESH_PEER (31);
|
RS_REFRESH_PEER(31),
|
||||||
|
RS_SWITCH_RPC_THROTTLE(33);
|
||||||
|
|
||||||
ExecutorType(int value) {
|
ExecutorType(int value) {
|
||||||
}
|
}
|
||||||
|
|
|
@ -1683,4 +1683,41 @@ public class MasterCoprocessorHost
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void preSwitchRpcThrottle(boolean enable) throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null :new MasterObserverOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(MasterObserver observer) throws IOException {
|
||||||
|
observer.preSwitchRpcThrottle(this, enable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postSwitchRpcThrottle(final boolean oldValue, final boolean newValue)
|
||||||
|
throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(MasterObserver observer) throws IOException {
|
||||||
|
observer.postSwitchRpcThrottle(this, oldValue, newValue);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void preIsRpcThrottleEnabled() throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(MasterObserver observer) throws IOException {
|
||||||
|
observer.preIsRpcThrottleEnabled(this);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postIsRpcThrottleEnabled(boolean enabled) throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(MasterObserver observer) throws IOException {
|
||||||
|
observer.postIsRpcThrottleEnabled(this, enabled);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,6 +258,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
|
@ -2444,6 +2446,28 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||||
|
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||||
|
try {
|
||||||
|
master.checkInitialized();
|
||||||
|
return master.getMasterQuotaManager().switchRpcThrottle(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterProtos.IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||||
|
MasterProtos.IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||||
|
try {
|
||||||
|
master.checkInitialized();
|
||||||
|
return master.getMasterQuotaManager().isRpcThrottleEnabled(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean containMetaWals(ServerName serverName) throws IOException {
|
private boolean containMetaWals(ServerName serverName) throws IOException {
|
||||||
Path logDir = new Path(master.getWALRootDir(),
|
Path logDir = new Path(master.getWALRootDir(),
|
||||||
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||||
|
|
|
@ -27,8 +27,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface ServerProcedureInterface {
|
public interface ServerProcedureInterface {
|
||||||
public enum ServerOperationType {
|
public enum ServerOperationType {
|
||||||
CRASH_HANDLER
|
CRASH_HANDLER, SWITCH_RPC_THROTTLE
|
||||||
};
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Name of this server instance.
|
* @return Name of this server instance.
|
||||||
|
|
|
@ -35,6 +35,8 @@ class ServerQueue extends Queue<ServerName> {
|
||||||
switch (spi.getServerOperationType()) {
|
switch (spi.getServerOperationType()) {
|
||||||
case CRASH_HANDLER:
|
case CRASH_HANDLER:
|
||||||
return true;
|
return true;
|
||||||
|
case SWITCH_RPC_THROTTLE:
|
||||||
|
return false;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,164 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||||
|
import org.apache.hadoop.hbase.quotas.RpcThrottleStorage;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleState;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleStateData;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The procedure to switch rpc throttle
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SwitchRpcThrottleProcedure
|
||||||
|
extends StateMachineProcedure<MasterProcedureEnv, SwitchRpcThrottleState>
|
||||||
|
implements ServerProcedureInterface {
|
||||||
|
|
||||||
|
private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class);
|
||||||
|
|
||||||
|
RpcThrottleStorage rpcThrottleStorage;
|
||||||
|
boolean rpcThrottleEnabled;
|
||||||
|
ProcedurePrepareLatch syncLatch;
|
||||||
|
ServerName serverName;
|
||||||
|
int attempts;
|
||||||
|
|
||||||
|
public SwitchRpcThrottleProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwitchRpcThrottleProcedure(RpcThrottleStorage rpcThrottleStorage,
|
||||||
|
boolean rpcThrottleEnabled, ServerName serverName, final ProcedurePrepareLatch syncLatch) {
|
||||||
|
this.rpcThrottleStorage = rpcThrottleStorage;
|
||||||
|
this.syncLatch = syncLatch;
|
||||||
|
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
||||||
|
this.serverName = serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState state)
|
||||||
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
|
switch (state) {
|
||||||
|
case UPDATE_SWITCH_RPC_THROTTLE_STORAGE:
|
||||||
|
try {
|
||||||
|
switchThrottleState(env, rpcThrottleEnabled);
|
||||||
|
} catch (IOException e) {
|
||||||
|
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++);
|
||||||
|
LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry",
|
||||||
|
rpcThrottleEnabled, backoff / 1000, e);
|
||||||
|
setTimeout(Math.toIntExact(backoff));
|
||||||
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
|
skipPersistence();
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
setNextState(SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_ON_RS);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case SWITCH_RPC_THROTTLE_ON_RS:
|
||||||
|
SwitchRpcThrottleRemoteProcedure[] subProcedures =
|
||||||
|
env.getMasterServices().getServerManager().getOnlineServersList().stream()
|
||||||
|
.map(sn -> new SwitchRpcThrottleRemoteProcedure(sn, rpcThrottleEnabled))
|
||||||
|
.toArray(SwitchRpcThrottleRemoteProcedure[]::new);
|
||||||
|
addChildProcedure(subProcedures);
|
||||||
|
setNextState(SwitchRpcThrottleState.POST_SWITCH_RPC_THROTTLE);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case POST_SWITCH_RPC_THROTTLE:
|
||||||
|
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
|
||||||
|
return Flow.NO_MORE_STATE;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollbackState(MasterProcedureEnv env, SwitchRpcThrottleState state)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SwitchRpcThrottleState getState(int stateId) {
|
||||||
|
return SwitchRpcThrottleState.forNumber(stateId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(SwitchRpcThrottleState throttleState) {
|
||||||
|
return throttleState.getNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SwitchRpcThrottleState getInitialState() {
|
||||||
|
return SwitchRpcThrottleState.UPDATE_SWITCH_RPC_THROTTLE_STORAGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SwitchRpcThrottleState getCurrentState() {
|
||||||
|
return super.getCurrentState();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
super.serializeStateData(serializer);
|
||||||
|
serializer.serialize(
|
||||||
|
SwitchRpcThrottleStateData.newBuilder().setRpcThrottleEnabled(rpcThrottleEnabled).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
super.deserializeStateData(serializer);
|
||||||
|
SwitchRpcThrottleStateData data = serializer.deserialize(SwitchRpcThrottleStateData.class);
|
||||||
|
rpcThrottleEnabled = data.getRpcThrottleEnabled();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void switchThrottleState(MasterProcedureEnv env, boolean rpcThrottleEnabled)
|
||||||
|
throws IOException {
|
||||||
|
rpcThrottleStorage.switchRpcThrottle(rpcThrottleEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void toStringClassDetails(StringBuilder sb) {
|
||||||
|
sb.append(getClass().getSimpleName());
|
||||||
|
sb.append(" server=");
|
||||||
|
sb.append(serverName);
|
||||||
|
sb.append(", rpcThrottleEnabled=");
|
||||||
|
sb.append(rpcThrottleEnabled);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,171 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The procedure to switch rpc throttle on region server
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
|
implements RemoteProcedure<MasterProcedureEnv, ServerName>, ServerProcedureInterface {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class);
|
||||||
|
private ServerName targetServer;
|
||||||
|
private boolean rpcThrottleEnabled;
|
||||||
|
|
||||||
|
public SwitchRpcThrottleRemoteProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwitchRpcThrottleRemoteProcedure(ServerName serverName, boolean rpcThrottleEnabled) {
|
||||||
|
this.targetServer = serverName;
|
||||||
|
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean dispatched;
|
||||||
|
private ProcedureEvent<?> event;
|
||||||
|
private boolean succ;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
if (dispatched) {
|
||||||
|
if (succ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||||
|
} catch (FailedRemoteDispatchException frde) {
|
||||||
|
LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}",
|
||||||
|
rpcThrottleEnabled, targetServer);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = true;
|
||||||
|
event = new ProcedureEvent<>(this);
|
||||||
|
event.suspendIfNotReady(this);
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(MasterProcedureEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
SwitchRpcThrottleRemoteStateData.newBuilder()
|
||||||
|
.setTargetServer(ProtobufUtil.toServerName(targetServer))
|
||||||
|
.setRpcThrottleEnabled(rpcThrottleEnabled).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
SwitchRpcThrottleRemoteStateData data =
|
||||||
|
serializer.deserialize(SwitchRpcThrottleRemoteStateData.class);
|
||||||
|
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||||
|
rpcThrottleEnabled = data.getRpcThrottleEnabled();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteProcedureDispatcher.RemoteOperation
|
||||||
|
remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) {
|
||||||
|
assert targetServer.equals(remote);
|
||||||
|
return new RSProcedureDispatcher.ServerOperation(this, getProcId(),
|
||||||
|
SwitchRpcThrottleRemoteCallable.class,
|
||||||
|
SwitchRpcThrottleRemoteStateData.newBuilder()
|
||||||
|
.setTargetServer(ProtobufUtil.toServerName(remote))
|
||||||
|
.setRpcThrottleEnabled(rpcThrottleEnabled).build()
|
||||||
|
.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
||||||
|
IOException exception) {
|
||||||
|
complete(env, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||||
|
complete(env, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
||||||
|
complete(env, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return targetServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
|
if (error != null) {
|
||||||
|
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
|
||||||
|
error);
|
||||||
|
this.succ = false;
|
||||||
|
} else {
|
||||||
|
this.succ = true;
|
||||||
|
}
|
||||||
|
event.wake(env.getProcedureScheduler());
|
||||||
|
event = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void toStringClassDetails(StringBuilder sb) {
|
||||||
|
sb.append(getClass().getSimpleName());
|
||||||
|
sb.append(" server=");
|
||||||
|
sb.append(targetServer);
|
||||||
|
sb.append(", rpcThrottleEnabled=");
|
||||||
|
sb.append(rpcThrottleEnabled);
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.RegionStateListener;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
|
||||||
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -44,8 +46,12 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Master Quota Manager.
|
* Master Quota Manager.
|
||||||
|
@ -69,6 +75,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
private NamespaceAuditor namespaceQuotaManager;
|
private NamespaceAuditor namespaceQuotaManager;
|
||||||
private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
|
private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
|
||||||
|
// Storage for quota rpc throttle
|
||||||
|
private RpcThrottleStorage rpcThrottleStorage;
|
||||||
|
|
||||||
public MasterQuotaManager(final MasterServices masterServices) {
|
public MasterQuotaManager(final MasterServices masterServices) {
|
||||||
this.masterServices = masterServices;
|
this.masterServices = masterServices;
|
||||||
|
@ -97,6 +105,9 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
||||||
namespaceQuotaManager.start();
|
namespaceQuotaManager.start();
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
|
||||||
|
rpcThrottleStorage =
|
||||||
|
new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
@ -300,6 +311,49 @@ public class MasterQuotaManager implements RegionStateListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
|
||||||
|
throws IOException {
|
||||||
|
boolean rpcThrottle = request.getRpcThrottleEnabled();
|
||||||
|
if (initialized) {
|
||||||
|
masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
|
||||||
|
boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||||
|
if (rpcThrottle != oldRpcThrottle) {
|
||||||
|
LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
|
||||||
|
oldRpcThrottle, rpcThrottle);
|
||||||
|
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
|
||||||
|
SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
|
||||||
|
rpcThrottle, masterServices.getServerName(), latch);
|
||||||
|
masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
|
||||||
|
latch.await();
|
||||||
|
} else {
|
||||||
|
LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
|
||||||
|
rpcThrottle);
|
||||||
|
}
|
||||||
|
SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
|
||||||
|
.setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
|
||||||
|
masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
|
||||||
|
return response;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
|
||||||
|
return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
|
||||||
|
throws IOException {
|
||||||
|
if (initialized) {
|
||||||
|
masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
|
||||||
|
boolean enabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||||
|
IsRpcThrottleEnabledResponse response =
|
||||||
|
IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
|
||||||
|
masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
|
||||||
|
return response;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Skip get rpc throttle because rpc quota is disabled");
|
||||||
|
return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
|
private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
if (req.hasRemoveAll() && req.getRemoveAll() == true) {
|
if (req.hasRemoveAll() && req.getRemoveAll() == true) {
|
||||||
|
|
|
@ -52,9 +52,14 @@ public class RegionServerRpcQuotaManager {
|
||||||
private final RegionServerServices rsServices;
|
private final RegionServerServices rsServices;
|
||||||
|
|
||||||
private QuotaCache quotaCache = null;
|
private QuotaCache quotaCache = null;
|
||||||
|
private volatile boolean rpcThrottleEnabled;
|
||||||
|
// Storage for quota rpc throttle
|
||||||
|
private RpcThrottleStorage rpcThrottleStorage;
|
||||||
|
|
||||||
public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
|
public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
|
||||||
this.rsServices = rsServices;
|
this.rsServices = rsServices;
|
||||||
|
rpcThrottleStorage =
|
||||||
|
new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(final RpcScheduler rpcScheduler) throws IOException {
|
public void start(final RpcScheduler rpcScheduler) throws IOException {
|
||||||
|
@ -68,6 +73,8 @@ public class RegionServerRpcQuotaManager {
|
||||||
// Initialize quota cache
|
// Initialize quota cache
|
||||||
quotaCache = new QuotaCache(rsServices);
|
quotaCache = new QuotaCache(rsServices);
|
||||||
quotaCache.start();
|
quotaCache.start();
|
||||||
|
rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||||
|
LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
@ -76,10 +83,31 @@ public class RegionServerRpcQuotaManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isQuotaEnabled() {
|
@VisibleForTesting
|
||||||
|
protected boolean isRpcThrottleEnabled() {
|
||||||
|
return rpcThrottleEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isQuotaEnabled() {
|
||||||
return quotaCache != null;
|
return quotaCache != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void switchRpcThrottle(boolean enable) throws IOException {
|
||||||
|
if (isQuotaEnabled()) {
|
||||||
|
if (rpcThrottleEnabled != enable) {
|
||||||
|
boolean previousEnabled = rpcThrottleEnabled;
|
||||||
|
rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||||
|
LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"Skip switch rpc throttle because previous value {} is the same as current value {}",
|
||||||
|
rpcThrottleEnabled, enable);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
QuotaCache getQuotaCache() {
|
QuotaCache getQuotaCache() {
|
||||||
return quotaCache;
|
return quotaCache;
|
||||||
|
@ -93,7 +121,7 @@ public class RegionServerRpcQuotaManager {
|
||||||
* @return the OperationQuota
|
* @return the OperationQuota
|
||||||
*/
|
*/
|
||||||
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
|
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
|
||||||
if (isQuotaEnabled() && !table.isSystemTable()) {
|
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
|
||||||
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
|
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
|
||||||
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
|
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
|
||||||
boolean useNoop = userLimiter.isBypass();
|
boolean useNoop = userLimiter.isBypass();
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZK based rpc throttle storage.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RpcThrottleStorage {
|
||||||
|
public static final String RPC_THROTTLE_ZNODE = "zookeeper.znode.quota.rpc.throttle";
|
||||||
|
public static final String RPC_THROTTLE_ZNODE_DEFAULT = "rpc-throttle";
|
||||||
|
|
||||||
|
private final ZKWatcher zookeeper;
|
||||||
|
private final String rpcThrottleZNode;
|
||||||
|
|
||||||
|
public RpcThrottleStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||||
|
this.zookeeper = zookeeper;
|
||||||
|
this.rpcThrottleZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode,
|
||||||
|
conf.get(RPC_THROTTLE_ZNODE, RPC_THROTTLE_ZNODE_DEFAULT));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRpcThrottleEnabled() throws IOException {
|
||||||
|
try {
|
||||||
|
byte[] upData = ZKUtil.getData(zookeeper, rpcThrottleZNode);
|
||||||
|
return upData == null || Bytes.toBoolean(upData);
|
||||||
|
} catch (KeeperException | InterruptedException e) {
|
||||||
|
throw new IOException("Failed to get rpc throttle", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the rpc throttle value.
|
||||||
|
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||||
|
* @throws IOException if an unexpected io exception occurs
|
||||||
|
*/
|
||||||
|
public void switchRpcThrottle(boolean enable) throws IOException {
|
||||||
|
try {
|
||||||
|
byte[] upData = Bytes.toBytes(enable);
|
||||||
|
ZKUtil.createSetData(zookeeper, rpcThrottleZNode, upData);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed to store rpc throttle", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1934,6 +1934,8 @@ public class HRegionServer extends HasThread implements
|
||||||
}
|
}
|
||||||
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
|
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
|
||||||
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
|
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
|
||||||
|
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
|
||||||
|
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
|
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
|
||||||
uncaughtExceptionHandler);
|
uncaughtExceptionHandler);
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The callable executed at RS side to switch rpc throttle state. <br/>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
|
||||||
|
private HRegionServer rs;
|
||||||
|
private boolean rpcThrottleEnabled;
|
||||||
|
private Exception initError;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
if (initError != null) {
|
||||||
|
throw initError;
|
||||||
|
}
|
||||||
|
rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(byte[] parameter, HRegionServer rs) {
|
||||||
|
this.rs = rs;
|
||||||
|
try {
|
||||||
|
SwitchRpcThrottleRemoteStateData param =
|
||||||
|
SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
|
||||||
|
rpcThrottleEnabled = param.getRpcThrottleEnabled();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
initError = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventType getEventType() {
|
||||||
|
return EventType.M_RS_SWITCH_RPC_THROTTLE;
|
||||||
|
}
|
||||||
|
}
|
|
@ -2596,6 +2596,18 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
||||||
checkSystemOrSuperUser(getActiveUser(ctx));
|
checkSystemOrSuperUser(getActiveUser(ctx));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||||
|
boolean enable) throws IOException {
|
||||||
|
requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
|
||||||
|
throws IOException {
|
||||||
|
requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the active user to which authorization checks should be applied.
|
* Returns the active user to which authorization checks should be applied.
|
||||||
* If we are in the context of an RPC call, the remote user is used,
|
* If we are in the context of an RPC call, the remote user is used,
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -178,6 +179,14 @@ public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
|
||||||
assertNumResults(0, null);
|
assertNumResults(0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSwitchRpcThrottle() throws Exception {
|
||||||
|
CompletableFuture<Boolean> future1 = ASYNC_CONN.getAdmin().switchRpcThrottle(true);
|
||||||
|
assertEquals(true, future1.get().booleanValue());
|
||||||
|
CompletableFuture<Boolean> future2 = ASYNC_CONN.getAdmin().isRpcThrottleEnabled();
|
||||||
|
assertEquals(true, future2.get().booleanValue());
|
||||||
|
}
|
||||||
|
|
||||||
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
||||||
assertEquals(expected, countResults(filter));
|
assertEquals(expected, countResults(filter));
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.quotas;
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -47,8 +48,10 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -520,6 +523,44 @@ public class TestQuotaAdmin {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcThrottleWhenStartup() throws IOException, InterruptedException {
|
||||||
|
TEST_UTIL.getAdmin().switchRpcThrottle(false);
|
||||||
|
assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||||
|
TEST_UTIL.killMiniHBaseCluster();
|
||||||
|
|
||||||
|
TEST_UTIL.startMiniHBaseCluster();
|
||||||
|
assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||||
|
for (JVMClusterUtil.RegionServerThread rs : TEST_UTIL.getHBaseCluster()
|
||||||
|
.getRegionServerThreads()) {
|
||||||
|
RegionServerRpcQuotaManager quotaManager =
|
||||||
|
rs.getRegionServer().getRegionServerRpcQuotaManager();
|
||||||
|
assertFalse(quotaManager.isRpcThrottleEnabled());
|
||||||
|
}
|
||||||
|
// enable rpc throttle
|
||||||
|
TEST_UTIL.getAdmin().switchRpcThrottle(true);
|
||||||
|
assertTrue(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSwitchRpcThrottle() throws IOException {
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
testSwitchRpcThrottle(admin, true, true);
|
||||||
|
testSwitchRpcThrottle(admin, true, false);
|
||||||
|
testSwitchRpcThrottle(admin, false, false);
|
||||||
|
testSwitchRpcThrottle(admin, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSwitchRpcThrottle(Admin admin, boolean oldRpcThrottle, boolean newRpcThrottle)
|
||||||
|
throws IOException {
|
||||||
|
boolean state = admin.switchRpcThrottle(newRpcThrottle);
|
||||||
|
Assert.assertEquals(oldRpcThrottle, state);
|
||||||
|
Assert.assertEquals(newRpcThrottle, admin.isRpcThrottleEnabled());
|
||||||
|
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||||
|
.forEach(rs -> Assert.assertEquals(newRpcThrottle,
|
||||||
|
rs.getRegionServer().getRegionServerRpcQuotaManager().isRpcThrottleEnabled()));
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu)
|
private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// Verify the RPC Quotas in the table
|
// Verify the RPC Quotas in the table
|
||||||
|
|
|
@ -3400,6 +3400,32 @@ public class TestAccessController extends SecureTestUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSwitchRpcThrottle() throws Exception {
|
||||||
|
AccessTestAction action = new AccessTestAction() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
|
ACCESS_CONTROLLER.preSwitchRpcThrottle(ObserverContextImpl.createAndPrepare(CP_ENV), true);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
verifyAllowed(action, SUPERUSER, USER_ADMIN);
|
||||||
|
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsRpcThrottleEnabled() throws Exception {
|
||||||
|
AccessTestAction action = new AccessTestAction() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
|
ACCESS_CONTROLLER.preIsRpcThrottleEnabled(ObserverContextImpl.createAndPrepare(CP_ENV));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
verifyAllowed(action, SUPERUSER, USER_ADMIN);
|
||||||
|
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Validate Global User ACL
|
* Validate Global User ACL
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -247,6 +247,10 @@ module Hbase
|
||||||
QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection())
|
QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection())
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def switch_rpc_throttle(enabled)
|
||||||
|
@admin.switchRpcThrottle(java.lang.Boolean.valueOf(enabled))
|
||||||
|
end
|
||||||
|
|
||||||
def _parse_size(str_limit)
|
def _parse_size(str_limit)
|
||||||
str_limit = str_limit.downcase
|
str_limit = str_limit.downcase
|
||||||
match = /(\d+)([bkmgtp%]*)/.match(str_limit)
|
match = /(\d+)([bkmgtp%]*)/.match(str_limit)
|
||||||
|
|
|
@ -434,6 +434,8 @@ Shell.load_command_group(
|
||||||
list_quota_table_sizes
|
list_quota_table_sizes
|
||||||
list_quota_snapshots
|
list_quota_snapshots
|
||||||
list_snapshot_sizes
|
list_snapshot_sizes
|
||||||
|
enable_rpc_throttle
|
||||||
|
disable_rpc_throttle
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class DisableRpcThrottle < Command
|
||||||
|
def help
|
||||||
|
return <<-EOF
|
||||||
|
Disable quota rpc throttle. Returns previous rpc throttle enabled value.
|
||||||
|
NOTE: if quota is not enabled, this will not work and always return false.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
hbase> disable_rpc_throttle
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command
|
||||||
|
prev_state = quotas_admin.switch_rpc_throttle(false) ? 'true' : 'false'
|
||||||
|
formatter.row(["Previous rpc throttle state : #{prev_state}"])
|
||||||
|
prev_state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,40 @@
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class EnableRpcThrottle < Command
|
||||||
|
def help
|
||||||
|
return <<-EOF
|
||||||
|
Enable quota rpc throttle. Returns previous rpc throttle enabled value.
|
||||||
|
NOTE: if quota is not enabled, this will not work and always return false.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
hbase> enable_rpc_throttle
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command
|
||||||
|
prev_state = quotas_admin.switch_rpc_throttle(true) ? 'true' : 'false'
|
||||||
|
formatter.row(["Previous rpc throttle state : #{prev_state}"])
|
||||||
|
prev_state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -162,5 +162,13 @@ module Hbase
|
||||||
output = capture_stdout{ command(:list_quotas) }
|
output = capture_stdout{ command(:list_quotas) }
|
||||||
assert(output.include?('0 row(s)'))
|
assert(output.include?('0 row(s)'))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test 'switch rpc throttle' do
|
||||||
|
output = capture_stdout { command(:disable_rpc_throttle) }
|
||||||
|
assert(output.include?('Previous rpc throttle state : true'))
|
||||||
|
|
||||||
|
output = capture_stdout { command(:enable_rpc_throttle) }
|
||||||
|
assert(output.include?('Previous rpc throttle state : false'))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue