HBASE-21159 Add shell command to switch throttle on or off
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
94093e869a
commit
77db1fae09
|
@ -2796,4 +2796,17 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
void cloneTableSchema(final TableName tableName, final TableName newTableName,
|
||||
final boolean preserveSplits) throws IOException;
|
||||
|
||||
/**
|
||||
* Switch the rpc throttle enable state.
|
||||
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @return Previous rpc throttle enabled value
|
||||
*/
|
||||
boolean switchRpcThrottle(final boolean enable) throws IOException;
|
||||
|
||||
/**
|
||||
* Get if the rpc throttle is enabled.
|
||||
* @return True if rpc throttle is enabled
|
||||
*/
|
||||
boolean isRpcThrottleEnabled() throws IOException;
|
||||
}
|
||||
|
|
|
@ -1287,4 +1287,17 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
|
||||
List<String> serverNamesList);
|
||||
|
||||
/**
|
||||
* Switch the rpc throttle enabled state.
|
||||
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @return Previous rpc throttle enabled value
|
||||
*/
|
||||
CompletableFuture<Boolean> switchRpcThrottle(boolean enable);
|
||||
|
||||
/**
|
||||
* Get if the rpc throttle is enabled.
|
||||
* @return True if rpc throttle is enabled
|
||||
*/
|
||||
CompletableFuture<Boolean> isRpcThrottleEnabled() throws IOException;
|
||||
}
|
||||
|
|
|
@ -765,4 +765,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
List<String> serverNamesList) {
|
||||
return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
|
||||
return wrap(rawAdmin.switchRpcThrottle(enable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
||||
return wrap(rawAdmin.isRpcThrottleEnabled());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancer
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
|
||||
|
@ -111,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
||||
|
@ -1757,6 +1761,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
|
||||
return stub.transitReplicationPeerSyncReplicationState(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||
return stub.switchRpcThrottle(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||
IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||
return stub.isRpcThrottleEnabled(controller, request);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -173,6 +173,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMainte
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
||||
|
@ -4341,4 +4342,28 @@ public class HBaseAdmin implements Admin {
|
|||
createTable(htd);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean switchRpcThrottle(final boolean enable) throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
return this.master
|
||||
.switchRpcThrottle(getRpcController(), MasterProtos.SwitchRpcThrottleRequest
|
||||
.newBuilder().setRpcThrottleEnabled(enable).build())
|
||||
.getPreviousRpcThrottleEnabled();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRpcThrottleEnabled() throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
return this.master.isRpcThrottleEnabled(getRpcController(),
|
||||
IsRpcThrottleEnabledRequest.newBuilder().build()).getRpcThrottleEnabled();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -186,6 +186,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
|
@ -244,6 +246,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
|
@ -3610,4 +3614,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
|
||||
.serverName(serverName).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
|
||||
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
|
||||
SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
|
||||
(s, c, req, done) -> s.switchRpcThrottle(c, req, done),
|
||||
resp -> resp.getPreviousRpcThrottleEnabled()))
|
||||
.call();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
||||
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
|
||||
stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
|
||||
resp -> resp.getRpcThrottleEnabled()))
|
||||
.call();
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
|
@ -146,6 +148,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
|
@ -647,4 +651,16 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
|||
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
|
||||
return stub.transitReplicationPeerSyncReplicationState(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||
return stub.switchRpcThrottle(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||
IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||
return stub.isRpcThrottleEnabled(controller, request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -635,6 +635,21 @@ message ClearDeadServersResponse {
|
|||
repeated ServerName server_name = 1;
|
||||
}
|
||||
|
||||
message SwitchRpcThrottleRequest {
|
||||
required bool rpc_throttle_enabled = 1;
|
||||
}
|
||||
|
||||
message SwitchRpcThrottleResponse {
|
||||
required bool previous_rpc_throttle_enabled = 1;
|
||||
}
|
||||
|
||||
message IsRpcThrottleEnabledRequest {
|
||||
}
|
||||
|
||||
message IsRpcThrottleEnabledResponse {
|
||||
required bool rpc_throttle_enabled = 1;
|
||||
}
|
||||
|
||||
service MasterService {
|
||||
/** Used by the client to get the number of regions that have received the updated schema */
|
||||
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
|
||||
|
@ -992,6 +1007,12 @@ service MasterService {
|
|||
rpc ClearDeadServers(ClearDeadServersRequest)
|
||||
returns(ClearDeadServersResponse);
|
||||
|
||||
/** Turn the quota throttle on or off */
|
||||
rpc SwitchRpcThrottle (SwitchRpcThrottleRequest) returns (SwitchRpcThrottleResponse);
|
||||
|
||||
/** Get if is rpc throttled enabled */
|
||||
rpc IsRpcThrottleEnabled (IsRpcThrottleEnabledRequest)
|
||||
returns (IsRpcThrottleEnabledResponse);
|
||||
}
|
||||
|
||||
// HBCK Service definitions.
|
||||
|
|
|
@ -551,3 +551,18 @@ message OpenRegionProcedureStateData {
|
|||
message CloseRegionProcedureStateData {
|
||||
optional ServerName assign_candidate = 1;
|
||||
}
|
||||
|
||||
enum SwitchRpcThrottleState {
|
||||
UPDATE_SWITCH_RPC_THROTTLE_STORAGE = 1;
|
||||
SWITCH_RPC_THROTTLE_ON_RS = 2;
|
||||
POST_SWITCH_RPC_THROTTLE = 3;
|
||||
}
|
||||
|
||||
message SwitchRpcThrottleStateData {
|
||||
required bool rpc_throttle_enabled = 1;
|
||||
}
|
||||
|
||||
message SwitchRpcThrottleRemoteStateData {
|
||||
required ServerName target_server = 1;
|
||||
required bool rpc_throttle_enabled = 2;
|
||||
}
|
|
@ -1519,4 +1519,40 @@ public interface MasterObserver {
|
|||
*/
|
||||
default void postRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
ServerName server, List<byte[]> encodedRegionNames) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called before switching rpc throttle enabled state.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param enable the rpc throttle value
|
||||
*/
|
||||
default void preSwitchRpcThrottle(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean enable) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after switching rpc throttle enabled state.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param oldValue the previously rpc throttle value
|
||||
* @param newValue the newly rpc throttle value
|
||||
*/
|
||||
default void postSwitchRpcThrottle(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean oldValue, final boolean newValue) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before getting if is rpc throttle enabled.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
*/
|
||||
default void preIsRpcThrottleEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after getting if is rpc throttle enabled.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param rpcThrottleEnabled the rpc throttle enabled value
|
||||
*/
|
||||
default void postIsRpcThrottleEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean rpcThrottleEnabled) throws IOException {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,6 +140,12 @@ public enum EventType {
|
|||
* Master asking RS to open a priority region.
|
||||
*/
|
||||
M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION),
|
||||
/**
|
||||
* Messages originating from Master to RS.<br>
|
||||
* M_RS_SWITCH_RPC_THROTTLE<br>
|
||||
* Master asking RS to switch rpc throttle state.
|
||||
*/
|
||||
M_RS_SWITCH_RPC_THROTTLE(27, ExecutorType.RS_SWITCH_RPC_THROTTLE),
|
||||
|
||||
/**
|
||||
* Messages originating from Client to Master.<br>
|
||||
|
|
|
@ -48,7 +48,8 @@ public enum ExecutorType {
|
|||
RS_COMPACTED_FILES_DISCHARGER (29),
|
||||
RS_OPEN_PRIORITY_REGION (30),
|
||||
RS_REFRESH_PEER(31),
|
||||
RS_REPLAY_SYNC_REPLICATION_WAL(32);
|
||||
RS_REPLAY_SYNC_REPLICATION_WAL(32),
|
||||
RS_SWITCH_RPC_THROTTLE(33);
|
||||
|
||||
ExecutorType(int value) {
|
||||
}
|
||||
|
|
|
@ -1777,4 +1777,41 @@ public class MasterCoprocessorHost
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preSwitchRpcThrottle(boolean enable) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null :new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preSwitchRpcThrottle(this, enable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postSwitchRpcThrottle(final boolean oldValue, final boolean newValue)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.postSwitchRpcThrottle(this, oldValue, newValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preIsRpcThrottleEnabled() throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preIsRpcThrottleEnabled(this);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postIsRpcThrottleEnabled(boolean enabled) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.postIsRpcThrottleEnabled(this, enabled);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -258,6 +258,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
|
@ -2478,6 +2480,28 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
|
||||
SwitchRpcThrottleRequest request) throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
return master.getMasterQuotaManager().switchRpcThrottle(request);
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
|
||||
MasterProtos.IsRpcThrottleEnabledRequest request) throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
return master.getMasterQuotaManager().isRpcThrottleEnabled(request);
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean containMetaWals(ServerName serverName) throws IOException {
|
||||
Path logDir = new Path(master.getWALRootDir(),
|
||||
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public interface ServerProcedureInterface {
|
||||
public enum ServerOperationType {
|
||||
CRASH_HANDLER
|
||||
CRASH_HANDLER, SWITCH_RPC_THROTTLE
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -35,6 +35,8 @@ class ServerQueue extends Queue<ServerName> {
|
|||
switch (spi.getServerOperationType()) {
|
||||
case CRASH_HANDLER:
|
||||
return true;
|
||||
case SWITCH_RPC_THROTTLE:
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.quotas.RpcThrottleStorage;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
|
||||
/**
|
||||
* The procedure to switch rpc throttle
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SwitchRpcThrottleProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, SwitchRpcThrottleState>
|
||||
implements ServerProcedureInterface {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class);
|
||||
|
||||
RpcThrottleStorage rpcThrottleStorage;
|
||||
boolean rpcThrottleEnabled;
|
||||
ProcedurePrepareLatch syncLatch;
|
||||
ServerName serverName;
|
||||
int attempts;
|
||||
|
||||
public SwitchRpcThrottleProcedure() {
|
||||
}
|
||||
|
||||
public SwitchRpcThrottleProcedure(RpcThrottleStorage rpcThrottleStorage,
|
||||
boolean rpcThrottleEnabled, ServerName serverName, final ProcedurePrepareLatch syncLatch) {
|
||||
this.rpcThrottleStorage = rpcThrottleStorage;
|
||||
this.syncLatch = syncLatch;
|
||||
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
switch (state) {
|
||||
case UPDATE_SWITCH_RPC_THROTTLE_STORAGE:
|
||||
try {
|
||||
switchThrottleState(env, rpcThrottleEnabled);
|
||||
} catch (IOException e) {
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++);
|
||||
LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry",
|
||||
rpcThrottleEnabled, backoff / 1000, e);
|
||||
setTimeout(Math.toIntExact(backoff));
|
||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||
skipPersistence();
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
setNextState(SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_ON_RS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SWITCH_RPC_THROTTLE_ON_RS:
|
||||
SwitchRpcThrottleRemoteProcedure[] subProcedures =
|
||||
env.getMasterServices().getServerManager().getOnlineServersList().stream()
|
||||
.map(sn -> new SwitchRpcThrottleRemoteProcedure(sn, rpcThrottleEnabled))
|
||||
.toArray(SwitchRpcThrottleRemoteProcedure[]::new);
|
||||
addChildProcedure(subProcedures);
|
||||
setNextState(SwitchRpcThrottleState.POST_SWITCH_RPC_THROTTLE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case POST_SWITCH_RPC_THROTTLE:
|
||||
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, SwitchRpcThrottleState state)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SwitchRpcThrottleState getState(int stateId) {
|
||||
return SwitchRpcThrottleState.forNumber(stateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(SwitchRpcThrottleState throttleState) {
|
||||
return throttleState.getNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SwitchRpcThrottleState getInitialState() {
|
||||
return SwitchRpcThrottleState.UPDATE_SWITCH_RPC_THROTTLE_STORAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SwitchRpcThrottleState getCurrentState() {
|
||||
return super.getCurrentState();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(
|
||||
SwitchRpcThrottleStateData.newBuilder().setRpcThrottleEnabled(rpcThrottleEnabled).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
SwitchRpcThrottleStateData data = serializer.deserialize(SwitchRpcThrottleStateData.class);
|
||||
rpcThrottleEnabled = data.getRpcThrottleEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerName getServerName() {
|
||||
return serverName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMetaTableRegion() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerOperationType getServerOperationType() {
|
||||
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||
}
|
||||
|
||||
public void switchThrottleState(MasterProcedureEnv env, boolean rpcThrottleEnabled)
|
||||
throws IOException {
|
||||
rpcThrottleStorage.switchRpcThrottle(rpcThrottleEnabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" server=");
|
||||
sb.append(serverName);
|
||||
sb.append(", rpcThrottleEnabled=");
|
||||
sb.append(rpcThrottleEnabled);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
|
||||
|
||||
/**
|
||||
* The procedure to switch rpc throttle on region server
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||
implements RemoteProcedure<MasterProcedureEnv, ServerName>, ServerProcedureInterface {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class);
|
||||
private ServerName targetServer;
|
||||
private boolean rpcThrottleEnabled;
|
||||
|
||||
public SwitchRpcThrottleRemoteProcedure() {
|
||||
}
|
||||
|
||||
public SwitchRpcThrottleRemoteProcedure(ServerName serverName, boolean rpcThrottleEnabled) {
|
||||
this.targetServer = serverName;
|
||||
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
||||
}
|
||||
|
||||
private boolean dispatched;
|
||||
private ProcedureEvent<?> event;
|
||||
private boolean succ;
|
||||
|
||||
@Override
|
||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
if (dispatched) {
|
||||
if (succ) {
|
||||
return null;
|
||||
}
|
||||
dispatched = false;
|
||||
}
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException frde) {
|
||||
LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}",
|
||||
rpcThrottleEnabled, targetServer);
|
||||
return null;
|
||||
}
|
||||
dispatched = true;
|
||||
event = new ProcedureEvent<>(this);
|
||||
event.suspendIfNotReady(this);
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(MasterProcedureEnv env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
SwitchRpcThrottleRemoteStateData.newBuilder()
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer))
|
||||
.setRpcThrottleEnabled(rpcThrottleEnabled).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
SwitchRpcThrottleRemoteStateData data =
|
||||
serializer.deserialize(SwitchRpcThrottleRemoteStateData.class);
|
||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||
rpcThrottleEnabled = data.getRpcThrottleEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteProcedureDispatcher.RemoteOperation
|
||||
remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) {
|
||||
assert targetServer.equals(remote);
|
||||
return new RSProcedureDispatcher.ServerOperation(this, getProcId(),
|
||||
SwitchRpcThrottleRemoteCallable.class,
|
||||
SwitchRpcThrottleRemoteStateData.newBuilder()
|
||||
.setTargetServer(ProtobufUtil.toServerName(remote))
|
||||
.setRpcThrottleEnabled(rpcThrottleEnabled).build()
|
||||
.toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
||||
IOException exception) {
|
||||
complete(env, exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||
complete(env, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
||||
complete(env, error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerName getServerName() {
|
||||
return targetServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMetaTableRegion() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerOperationType getServerOperationType() {
|
||||
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||
}
|
||||
|
||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||
if (error != null) {
|
||||
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
|
||||
error);
|
||||
this.succ = false;
|
||||
} else {
|
||||
this.succ = true;
|
||||
}
|
||||
event.wake(env.getProcedureScheduler());
|
||||
event = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" server=");
|
||||
sb.append(targetServer);
|
||||
sb.append(", rpcThrottleEnabled=");
|
||||
sb.append(rpcThrottleEnabled);
|
||||
}
|
||||
}
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
|
||||
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -52,8 +54,12 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
|
||||
|
||||
|
@ -79,6 +85,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
private boolean initialized = false;
|
||||
private NamespaceAuditor namespaceQuotaManager;
|
||||
private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
|
||||
// Storage for quota rpc throttle
|
||||
private RpcThrottleStorage rpcThrottleStorage;
|
||||
|
||||
public MasterQuotaManager(final MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
|
@ -107,6 +115,9 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
||||
namespaceQuotaManager.start();
|
||||
initialized = true;
|
||||
|
||||
rpcThrottleStorage =
|
||||
new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -310,6 +321,49 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
}
|
||||
}
|
||||
|
||||
public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
|
||||
throws IOException {
|
||||
boolean rpcThrottle = request.getRpcThrottleEnabled();
|
||||
if (initialized) {
|
||||
masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
|
||||
boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||
if (rpcThrottle != oldRpcThrottle) {
|
||||
LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
|
||||
oldRpcThrottle, rpcThrottle);
|
||||
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
|
||||
SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
|
||||
rpcThrottle, masterServices.getServerName(), latch);
|
||||
masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
|
||||
latch.await();
|
||||
} else {
|
||||
LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
|
||||
rpcThrottle);
|
||||
}
|
||||
SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
|
||||
.setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
|
||||
masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
|
||||
return response;
|
||||
} else {
|
||||
LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
|
||||
return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
|
||||
}
|
||||
}
|
||||
|
||||
public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
|
||||
throws IOException {
|
||||
if (initialized) {
|
||||
masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
|
||||
boolean enabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||
IsRpcThrottleEnabledResponse response =
|
||||
IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
|
||||
masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
|
||||
return response;
|
||||
} else {
|
||||
LOG.warn("Skip get rpc throttle because rpc quota is disabled");
|
||||
return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
|
||||
}
|
||||
}
|
||||
|
||||
private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
|
||||
throws IOException, InterruptedException {
|
||||
if (req.hasRemoveAll() && req.getRemoveAll() == true) {
|
||||
|
|
|
@ -52,9 +52,14 @@ public class RegionServerRpcQuotaManager {
|
|||
private final RegionServerServices rsServices;
|
||||
|
||||
private QuotaCache quotaCache = null;
|
||||
private volatile boolean rpcThrottleEnabled;
|
||||
// Storage for quota rpc throttle
|
||||
private RpcThrottleStorage rpcThrottleStorage;
|
||||
|
||||
public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
|
||||
this.rsServices = rsServices;
|
||||
rpcThrottleStorage =
|
||||
new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
|
||||
}
|
||||
|
||||
public void start(final RpcScheduler rpcScheduler) throws IOException {
|
||||
|
@ -68,6 +73,8 @@ public class RegionServerRpcQuotaManager {
|
|||
// Initialize quota cache
|
||||
quotaCache = new QuotaCache(rsServices);
|
||||
quotaCache.start();
|
||||
rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||
LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -76,10 +83,31 @@ public class RegionServerRpcQuotaManager {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isQuotaEnabled() {
|
||||
@VisibleForTesting
|
||||
protected boolean isRpcThrottleEnabled() {
|
||||
return rpcThrottleEnabled;
|
||||
}
|
||||
|
||||
private boolean isQuotaEnabled() {
|
||||
return quotaCache != null;
|
||||
}
|
||||
|
||||
public void switchRpcThrottle(boolean enable) throws IOException {
|
||||
if (isQuotaEnabled()) {
|
||||
if (rpcThrottleEnabled != enable) {
|
||||
boolean previousEnabled = rpcThrottleEnabled;
|
||||
rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
|
||||
LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
|
||||
} else {
|
||||
LOG.warn(
|
||||
"Skip switch rpc throttle because previous value {} is the same as current value {}",
|
||||
rpcThrottleEnabled, enable);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
QuotaCache getQuotaCache() {
|
||||
return quotaCache;
|
||||
|
@ -93,7 +121,7 @@ public class RegionServerRpcQuotaManager {
|
|||
* @return the OperationQuota
|
||||
*/
|
||||
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
|
||||
if (isQuotaEnabled() && !table.isSystemTable()) {
|
||||
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
|
||||
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
|
||||
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
|
||||
boolean useNoop = userLimiter.isBypass();
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* ZK based rpc throttle storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RpcThrottleStorage {
|
||||
public static final String RPC_THROTTLE_ZNODE = "zookeeper.znode.quota.rpc.throttle";
|
||||
public static final String RPC_THROTTLE_ZNODE_DEFAULT = "rpc-throttle";
|
||||
|
||||
private final ZKWatcher zookeeper;
|
||||
private final String rpcThrottleZNode;
|
||||
|
||||
public RpcThrottleStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||
this.zookeeper = zookeeper;
|
||||
this.rpcThrottleZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode,
|
||||
conf.get(RPC_THROTTLE_ZNODE, RPC_THROTTLE_ZNODE_DEFAULT));
|
||||
}
|
||||
|
||||
public boolean isRpcThrottleEnabled() throws IOException {
|
||||
try {
|
||||
byte[] upData = ZKUtil.getData(zookeeper, rpcThrottleZNode);
|
||||
return upData == null || Bytes.toBoolean(upData);
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new IOException("Failed to get rpc throttle", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the rpc throttle value.
|
||||
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @throws IOException if an unexpected io exception occurs
|
||||
*/
|
||||
public void switchRpcThrottle(boolean enable) throws IOException {
|
||||
try {
|
||||
byte[] upData = Bytes.toBytes(enable);
|
||||
ZKUtil.createSetData(zookeeper, rpcThrottleZNode, upData);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to store rpc throttle", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1963,6 +1963,8 @@ public class HRegionServer extends HasThread implements
|
|||
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
|
||||
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
|
||||
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
|
||||
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
|
||||
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
|
||||
|
||||
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
|
||||
uncaughtExceptionHandler);
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
|
||||
|
||||
/**
|
||||
* The callable executed at RS side to switch rpc throttle state. <br/>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
|
||||
private HRegionServer rs;
|
||||
private boolean rpcThrottleEnabled;
|
||||
private Exception initError;
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (initError != null) {
|
||||
throw initError;
|
||||
}
|
||||
rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(byte[] parameter, HRegionServer rs) {
|
||||
this.rs = rs;
|
||||
try {
|
||||
SwitchRpcThrottleRemoteStateData param =
|
||||
SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
|
||||
rpcThrottleEnabled = param.getRpcThrottleEnabled();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
initError = e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventType getEventType() {
|
||||
return EventType.M_RS_SWITCH_RPC_THROTTLE;
|
||||
}
|
||||
}
|
|
@ -2602,6 +2602,18 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
checkSystemOrSuperUser(getActiveUser(ctx));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
boolean enable) throws IOException {
|
||||
requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the active user to which authorization checks should be applied.
|
||||
* If we are in the context of an RPC call, the remote user is used,
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -178,6 +179,14 @@ public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
|
|||
assertNumResults(0, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchRpcThrottle() throws Exception {
|
||||
CompletableFuture<Boolean> future1 = ASYNC_CONN.getAdmin().switchRpcThrottle(true);
|
||||
assertEquals(true, future1.get().booleanValue());
|
||||
CompletableFuture<Boolean> future2 = ASYNC_CONN.getAdmin().isRpcThrottleEnabled();
|
||||
assertEquals(true, future2.get().booleanValue());
|
||||
}
|
||||
|
||||
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
||||
assertEquals(expected, countResults(filter));
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -47,8 +48,10 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
@ -520,6 +523,44 @@ public class TestQuotaAdmin {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcThrottleWhenStartup() throws IOException, InterruptedException {
|
||||
TEST_UTIL.getAdmin().switchRpcThrottle(false);
|
||||
assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||
TEST_UTIL.killMiniHBaseCluster();
|
||||
|
||||
TEST_UTIL.startMiniHBaseCluster();
|
||||
assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||
for (JVMClusterUtil.RegionServerThread rs : TEST_UTIL.getHBaseCluster()
|
||||
.getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager =
|
||||
rs.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
assertFalse(quotaManager.isRpcThrottleEnabled());
|
||||
}
|
||||
// enable rpc throttle
|
||||
TEST_UTIL.getAdmin().switchRpcThrottle(true);
|
||||
assertTrue(TEST_UTIL.getAdmin().isRpcThrottleEnabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchRpcThrottle() throws IOException {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
testSwitchRpcThrottle(admin, true, true);
|
||||
testSwitchRpcThrottle(admin, true, false);
|
||||
testSwitchRpcThrottle(admin, false, false);
|
||||
testSwitchRpcThrottle(admin, false, true);
|
||||
}
|
||||
|
||||
private void testSwitchRpcThrottle(Admin admin, boolean oldRpcThrottle, boolean newRpcThrottle)
|
||||
throws IOException {
|
||||
boolean state = admin.switchRpcThrottle(newRpcThrottle);
|
||||
Assert.assertEquals(oldRpcThrottle, state);
|
||||
Assert.assertEquals(newRpcThrottle, admin.isRpcThrottleEnabled());
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||
.forEach(rs -> Assert.assertEquals(newRpcThrottle,
|
||||
rs.getRegionServer().getRegionServerRpcQuotaManager().isRpcThrottleEnabled()));
|
||||
}
|
||||
|
||||
private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu)
|
||||
throws Exception {
|
||||
// Verify the RPC Quotas in the table
|
||||
|
|
|
@ -3418,6 +3418,32 @@ public class TestAccessController extends SecureTestUtil {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchRpcThrottle() throws Exception {
|
||||
AccessTestAction action = new AccessTestAction() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preSwitchRpcThrottle(ObserverContextImpl.createAndPrepare(CP_ENV), true);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
verifyAllowed(action, SUPERUSER, USER_ADMIN);
|
||||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsRpcThrottleEnabled() throws Exception {
|
||||
AccessTestAction action = new AccessTestAction() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preIsRpcThrottleEnabled(ObserverContextImpl.createAndPrepare(CP_ENV));
|
||||
return null;
|
||||
}
|
||||
};
|
||||
verifyAllowed(action, SUPERUSER, USER_ADMIN);
|
||||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate Global User ACL
|
||||
*/
|
||||
|
|
|
@ -243,6 +243,10 @@ module Hbase
|
|||
QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection)
|
||||
end
|
||||
|
||||
def switch_rpc_throttle(enabled)
|
||||
@admin.switchRpcThrottle(java.lang.Boolean.valueOf(enabled))
|
||||
end
|
||||
|
||||
def _parse_size(str_limit)
|
||||
str_limit = str_limit.downcase
|
||||
match = /(\d+)([bkmgtp%]*)/.match(str_limit)
|
||||
|
|
|
@ -435,6 +435,8 @@ Shell.load_command_group(
|
|||
list_quota_table_sizes
|
||||
list_quota_snapshots
|
||||
list_snapshot_sizes
|
||||
enable_rpc_throttle
|
||||
disable_rpc_throttle
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class DisableRpcThrottle < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Disable quota rpc throttle. Returns previous rpc throttle enabled value.
|
||||
NOTE: if quota is not enabled, this will not work and always return false.
|
||||
|
||||
Examples:
|
||||
hbase> disable_rpc_throttle
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
prev_state = quotas_admin.switch_rpc_throttle(false) ? 'true' : 'false'
|
||||
formatter.row(["Previous rpc throttle state : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,40 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class EnableRpcThrottle < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Enable quota rpc throttle. Returns previous rpc throttle enabled value.
|
||||
NOTE: if quota is not enabled, this will not work and always return false.
|
||||
|
||||
Examples:
|
||||
hbase> enable_rpc_throttle
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
prev_state = quotas_admin.switch_rpc_throttle(true) ? 'true' : 'false'
|
||||
formatter.row(["Previous rpc throttle state : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -162,5 +162,13 @@ module Hbase
|
|||
output = capture_stdout{ command(:list_quotas) }
|
||||
assert(output.include?('0 row(s)'))
|
||||
end
|
||||
|
||||
define_test 'switch rpc throttle' do
|
||||
output = capture_stdout { command(:disable_rpc_throttle) }
|
||||
assert(output.include?('Previous rpc throttle state : true'))
|
||||
|
||||
output = capture_stdout { command(:enable_rpc_throttle) }
|
||||
assert(output.include?('Previous rpc throttle state : false'))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue