From 77db1fae090bc20de62d8a86e9816c69dfb97b7a Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 4 Jan 2019 14:43:34 +0800 Subject: [PATCH] HBASE-21159 Add shell command to switch throttle on or off Signed-off-by: Guanghao Zhang --- .../org/apache/hadoop/hbase/client/Admin.java | 13 ++ .../hadoop/hbase/client/AsyncAdmin.java | 13 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 10 + .../client/ConnectionImplementation.java | 16 ++ .../hadoop/hbase/client/HBaseAdmin.java | 25 +++ .../hbase/client/RawAsyncHBaseAdmin.java | 28 +++ .../client/ShortCircuitMasterConnection.java | 16 ++ .../src/main/protobuf/Master.proto | 21 +++ .../src/main/protobuf/MasterProcedure.proto | 15 ++ .../hbase/coprocessor/MasterObserver.java | 36 ++++ .../hadoop/hbase/executor/EventType.java | 6 + .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../hbase/master/MasterCoprocessorHost.java | 37 ++++ .../hbase/master/MasterRpcServices.java | 24 +++ .../procedure/ServerProcedureInterface.java | 2 +- .../hbase/master/procedure/ServerQueue.java | 2 + .../procedure/SwitchRpcThrottleProcedure.java | 164 +++++++++++++++++ .../SwitchRpcThrottleRemoteProcedure.java | 171 ++++++++++++++++++ .../hbase/quotas/MasterQuotaManager.java | 54 ++++++ .../quotas/RegionServerRpcQuotaManager.java | 32 +++- .../hbase/quotas/RpcThrottleStorage.java | 69 +++++++ .../hbase/regionserver/HRegionServer.java | 2 + .../SwitchRpcThrottleRemoteCallable.java | 62 +++++++ .../security/access/AccessController.java | 12 ++ .../hbase/client/TestAsyncQuotaAdminApi.java | 9 + .../hadoop/hbase/quotas/TestQuotaAdmin.java | 41 +++++ .../security/access/TestAccessController.java | 26 +++ hbase-shell/src/main/ruby/hbase/quotas.rb | 4 + hbase-shell/src/main/ruby/shell.rb | 2 + .../shell/commands/disable_rpc_throttle.rb | 40 ++++ .../shell/commands/enable_rpc_throttle.rb | 40 ++++ .../src/test/ruby/hbase/quotas_test.rb | 8 + 32 files changed, 999 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottleStorage.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 08b44c93015..1d892b2bd35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2796,4 +2796,17 @@ public interface Admin extends Abortable, Closeable { */ void cloneTableSchema(final TableName tableName, final TableName newTableName, final boolean preserveSplits) throws IOException; + + /** + * Switch the rpc throttle enable state. + * @param enable Set to true to enable, false to disable. + * @return Previous rpc throttle enabled value + */ + boolean switchRpcThrottle(final boolean enable) throws IOException; + + /** + * Get if the rpc throttle is enabled. + * @return True if rpc throttle is enabled + */ + boolean isRpcThrottleEnabled() throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 6bb253a20ab..40ed2130267 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1287,4 +1287,17 @@ public interface AsyncAdmin { */ CompletableFuture> compactionSwitch(boolean switchState, List serverNamesList); + + /** + * Switch the rpc throttle enabled state. + * @param enable Set to true to enable, false to disable. + * @return Previous rpc throttle enabled value + */ + CompletableFuture switchRpcThrottle(boolean enable); + + /** + * Get if the rpc throttle is enabled. + * @return True if rpc throttle is enabled + */ + CompletableFuture isRpcThrottleEnabled() throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 39eda07df32..d8f4da51890 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -765,4 +765,14 @@ class AsyncHBaseAdmin implements AsyncAdmin { List serverNamesList) { return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList)); } + + @Override + public CompletableFuture switchRpcThrottle(boolean enable) { + return wrap(rawAdmin.switchRpcThrottle(enable)); + } + + @Override + public CompletableFuture isRpcThrottleEnabled() { + return wrap(rawAdmin.isRpcThrottleEnabled()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index da6b592b2ab..992a95c5692 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -101,6 +101,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancer import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; @@ -111,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; @@ -1757,6 +1761,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable { TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { return stub.transitReplicationPeerSyncReplicationState(controller, request); } + + @Override + public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + SwitchRpcThrottleRequest request) throws ServiceException { + return stub.switchRpcThrottle(controller, request); + } + + @Override + public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, + IsRpcThrottleEnabledRequest request) throws ServiceException { + return stub.isRpcThrottleEnabled(controller, request); + } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 45961ff4886..034ce03b4c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -173,6 +173,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMainte import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; @@ -4341,4 +4342,28 @@ public class HBaseAdmin implements Admin { createTable(htd); } } + + @Override + public boolean switchRpcThrottle(final boolean enable) throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return this.master + .switchRpcThrottle(getRpcController(), MasterProtos.SwitchRpcThrottleRequest + .newBuilder().setRpcThrottleEnabled(enable).build()) + .getPreviousRpcThrottleEnabled(); + } + }); + } + + @Override + public boolean isRpcThrottleEnabled() throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return this.master.isRpcThrottleEnabled(getRpcController(), + IsRpcThrottleEnabledRequest.newBuilder().build()).getRpcThrottleEnabled(); + } + }); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 869a63006a0..1440a647c8a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -186,6 +186,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; @@ -244,6 +246,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -3610,4 +3614,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) .serverName(serverName).call(); } + + @Override + public CompletableFuture switchRpcThrottle(boolean enable) { + CompletableFuture future = this. newMasterCaller() + .action((controller, stub) -> this + . call(controller, stub, + SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), + (s, c, req, done) -> s.switchRpcThrottle(c, req, done), + resp -> resp.getPreviousRpcThrottleEnabled())) + .call(); + return future; + } + + @Override + public CompletableFuture isRpcThrottleEnabled() { + CompletableFuture future = this. newMasterCaller() + .action((controller, stub) -> this + . call(controller, + stub, IsRpcThrottleEnabledRequest.newBuilder().build(), + (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), + resp -> resp.getRpcThrottleEnabled())) + .call(); + return future; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 7bb65d20242..197f98ba01b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; @@ -146,6 +148,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -647,4 +651,16 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { return stub.transitReplicationPeerSyncReplicationState(controller, request); } + + @Override + public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + SwitchRpcThrottleRequest request) throws ServiceException { + return stub.switchRpcThrottle(controller, request); + } + + @Override + public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, + IsRpcThrottleEnabledRequest request) throws ServiceException { + return stub.isRpcThrottleEnabled(controller, request); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 7f6513cea27..5cb4309cc62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -635,6 +635,21 @@ message ClearDeadServersResponse { repeated ServerName server_name = 1; } +message SwitchRpcThrottleRequest { + required bool rpc_throttle_enabled = 1; +} + +message SwitchRpcThrottleResponse { + required bool previous_rpc_throttle_enabled = 1; +} + +message IsRpcThrottleEnabledRequest { +} + +message IsRpcThrottleEnabledResponse { + required bool rpc_throttle_enabled = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -992,6 +1007,12 @@ service MasterService { rpc ClearDeadServers(ClearDeadServersRequest) returns(ClearDeadServersResponse); + /** Turn the quota throttle on or off */ + rpc SwitchRpcThrottle (SwitchRpcThrottleRequest) returns (SwitchRpcThrottleResponse); + + /** Get if is rpc throttled enabled */ + rpc IsRpcThrottleEnabled (IsRpcThrottleEnabledRequest) + returns (IsRpcThrottleEnabledResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index cc0c6ba3472..b365373ce69 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -551,3 +551,18 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +enum SwitchRpcThrottleState { + UPDATE_SWITCH_RPC_THROTTLE_STORAGE = 1; + SWITCH_RPC_THROTTLE_ON_RS = 2; + POST_SWITCH_RPC_THROTTLE = 3; +} + +message SwitchRpcThrottleStateData { + required bool rpc_throttle_enabled = 1; +} + +message SwitchRpcThrottleRemoteStateData { + required ServerName target_server = 1; + required bool rpc_throttle_enabled = 2; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index 1a8db79bd49..5d43f10a8b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -1519,4 +1519,40 @@ public interface MasterObserver { */ default void postRecommissionRegionServer(ObserverContext ctx, ServerName server, List encodedRegionNames) throws IOException {} + + /** + * Called before switching rpc throttle enabled state. + * @param ctx the coprocessor instance's environment + * @param enable the rpc throttle value + */ + default void preSwitchRpcThrottle(final ObserverContext ctx, + final boolean enable) throws IOException { + } + + /** + * Called after switching rpc throttle enabled state. + * @param ctx the coprocessor instance's environment + * @param oldValue the previously rpc throttle value + * @param newValue the newly rpc throttle value + */ + default void postSwitchRpcThrottle(final ObserverContext ctx, + final boolean oldValue, final boolean newValue) throws IOException { + } + + /** + * Called before getting if is rpc throttle enabled. + * @param ctx the coprocessor instance's environment + */ + default void preIsRpcThrottleEnabled(final ObserverContext ctx) + throws IOException { + } + + /** + * Called after getting if is rpc throttle enabled. + * @param ctx the coprocessor instance's environment + * @param rpcThrottleEnabled the rpc throttle enabled value + */ + default void postIsRpcThrottleEnabled(final ObserverContext ctx, + final boolean rpcThrottleEnabled) throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1cca8c..19264d23fe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -140,6 +140,12 @@ public enum EventType { * Master asking RS to open a priority region. */ M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION), + /** + * Messages originating from Master to RS.
+ * M_RS_SWITCH_RPC_THROTTLE
+ * Master asking RS to switch rpc throttle state. + */ + M_RS_SWITCH_RPC_THROTTLE(27, ExecutorType.RS_SWITCH_RPC_THROTTLE), /** * Messages originating from Client to Master.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354d8d1..819f3691ba9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_SWITCH_RPC_THROTTLE(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index e7b166c08e8..dd02a36347c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -1777,4 +1777,41 @@ public class MasterCoprocessorHost } }); } + + public void preSwitchRpcThrottle(boolean enable) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null :new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preSwitchRpcThrottle(this, enable); + } + }); + } + + public void postSwitchRpcThrottle(final boolean oldValue, final boolean newValue) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postSwitchRpcThrottle(this, oldValue, newValue); + } + }); + } + + public void preIsRpcThrottleEnabled() throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preIsRpcThrottleEnabled(this); + } + }); + } + + public void postIsRpcThrottleEnabled(boolean enabled) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postIsRpcThrottleEnabled(this, enabled); + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index cd838d59c47..89fcff95cbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -258,6 +258,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -2478,6 +2480,28 @@ public class MasterRpcServices extends RSRpcServices } } + @Override + public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + SwitchRpcThrottleRequest request) throws ServiceException { + try { + master.checkInitialized(); + return master.getMasterQuotaManager().switchRpcThrottle(request); + } catch (Exception e) { + throw new ServiceException(e); + } + } + + @Override + public MasterProtos.IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, + MasterProtos.IsRpcThrottleEnabledRequest request) throws ServiceException { + try { + master.checkInitialized(); + return master.getMasterQuotaManager().isRpcThrottleEnabled(request); + } catch (Exception e) { + throw new ServiceException(e); + } + } + private boolean containMetaWals(ServerName serverName) throws IOException { Path logDir = new Path(master.getWALRootDir(), AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index f3c10efa425..7549b1366fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER + CRASH_HANDLER, SWITCH_RPC_THROTTLE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 3a1b3c4cd6d..9e3b3119ab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -35,6 +35,8 @@ class ServerQueue extends Queue { switch (spi.getServerOperationType()) { case CRASH_HANDLER: return true; + case SWITCH_RPC_THROTTLE: + return false; default: break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java new file mode 100644 index 00000000000..1b080b06731 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.quotas.RpcThrottleStorage; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The procedure to switch rpc throttle + */ +@InterfaceAudience.Private +public class SwitchRpcThrottleProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class); + + RpcThrottleStorage rpcThrottleStorage; + boolean rpcThrottleEnabled; + ProcedurePrepareLatch syncLatch; + ServerName serverName; + int attempts; + + public SwitchRpcThrottleProcedure() { + } + + public SwitchRpcThrottleProcedure(RpcThrottleStorage rpcThrottleStorage, + boolean rpcThrottleEnabled, ServerName serverName, final ProcedurePrepareLatch syncLatch) { + this.rpcThrottleStorage = rpcThrottleStorage; + this.syncLatch = syncLatch; + this.rpcThrottleEnabled = rpcThrottleEnabled; + this.serverName = serverName; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case UPDATE_SWITCH_RPC_THROTTLE_STORAGE: + try { + switchThrottleState(env, rpcThrottleEnabled); + } catch (IOException e) { + long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++); + LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry", + rpcThrottleEnabled, backoff / 1000, e); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + setNextState(SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_ON_RS); + return Flow.HAS_MORE_STATE; + case SWITCH_RPC_THROTTLE_ON_RS: + SwitchRpcThrottleRemoteProcedure[] subProcedures = + env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new SwitchRpcThrottleRemoteProcedure(sn, rpcThrottleEnabled)) + .toArray(SwitchRpcThrottleRemoteProcedure[]::new); + addChildProcedure(subProcedures); + setNextState(SwitchRpcThrottleState.POST_SWITCH_RPC_THROTTLE); + return Flow.HAS_MORE_STATE; + case POST_SWITCH_RPC_THROTTLE: + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, SwitchRpcThrottleState state) + throws IOException, InterruptedException { + } + + @Override + protected SwitchRpcThrottleState getState(int stateId) { + return SwitchRpcThrottleState.forNumber(stateId); + } + + @Override + protected int getStateId(SwitchRpcThrottleState throttleState) { + return throttleState.getNumber(); + } + + @Override + protected SwitchRpcThrottleState getInitialState() { + return SwitchRpcThrottleState.UPDATE_SWITCH_RPC_THROTTLE_STORAGE; + } + + @Override + protected SwitchRpcThrottleState getCurrentState() { + return super.getCurrentState(); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize( + SwitchRpcThrottleStateData.newBuilder().setRpcThrottleEnabled(rpcThrottleEnabled).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + SwitchRpcThrottleStateData data = serializer.deserialize(SwitchRpcThrottleStateData.class); + rpcThrottleEnabled = data.getRpcThrottleEnabled(); + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SWITCH_RPC_THROTTLE; + } + + public void switchThrottleState(MasterProcedureEnv env, boolean rpcThrottleEnabled) + throws IOException { + rpcThrottleStorage.switchRpcThrottle(rpcThrottleEnabled); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" server="); + sb.append(serverName); + sb.append(", rpcThrottleEnabled="); + sb.append(rpcThrottleEnabled); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java new file mode 100644 index 00000000000..9a56ddc3285 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData; + +/** + * The procedure to switch rpc throttle on region server + */ +@InterfaceAudience.Private +public class SwitchRpcThrottleRemoteProcedure extends Procedure + implements RemoteProcedure, ServerProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class); + private ServerName targetServer; + private boolean rpcThrottleEnabled; + + public SwitchRpcThrottleRemoteProcedure() { + } + + public SwitchRpcThrottleRemoteProcedure(ServerName serverName, boolean rpcThrottleEnabled) { + this.targetServer = serverName; + this.rpcThrottleEnabled = rpcThrottleEnabled; + } + + private boolean dispatched; + private ProcedureEvent event; + private boolean succ; + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException frde) { + LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}", + rpcThrottleEnabled, targetServer); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + SwitchRpcThrottleRemoteStateData.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setRpcThrottleEnabled(rpcThrottleEnabled).build(); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SwitchRpcThrottleRemoteStateData data = + serializer.deserialize(SwitchRpcThrottleRemoteStateData.class); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + rpcThrottleEnabled = data.getRpcThrottleEnabled(); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation + remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) { + assert targetServer.equals(remote); + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), + SwitchRpcThrottleRemoteCallable.class, + SwitchRpcThrottleRemoteStateData.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(remote)) + .setRpcThrottleEnabled(rpcThrottleEnabled).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SWITCH_RPC_THROTTLE; + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (error != null) { + LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer, + error); + this.succ = false; + } else { + this.succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" server="); + sb.append(targetServer); + sb.append(", rpcThrottleEnabled="); + sb.append(rpcThrottleEnabled); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index bdeab8089cd..bb3cff19918 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; import org.apache.hadoop.hbase.namespace.NamespaceAuditor; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; @@ -52,8 +54,12 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; @@ -79,6 +85,8 @@ public class MasterQuotaManager implements RegionStateListener { private boolean initialized = false; private NamespaceAuditor namespaceQuotaManager; private ConcurrentHashMap regionSizes; + // Storage for quota rpc throttle + private RpcThrottleStorage rpcThrottleStorage; public MasterQuotaManager(final MasterServices masterServices) { this.masterServices = masterServices; @@ -107,6 +115,9 @@ public class MasterQuotaManager implements RegionStateListener { namespaceQuotaManager = new NamespaceAuditor(masterServices); namespaceQuotaManager.start(); initialized = true; + + rpcThrottleStorage = + new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration()); } public void stop() { @@ -310,6 +321,49 @@ public class MasterQuotaManager implements RegionStateListener { } } + public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) + throws IOException { + boolean rpcThrottle = request.getRpcThrottleEnabled(); + if (initialized) { + masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle); + boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled(); + if (rpcThrottle != oldRpcThrottle) { + LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(), + oldRpcThrottle, rpcThrottle); + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); + SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage, + rpcThrottle, masterServices.getServerName(), latch); + masterServices.getMasterProcedureExecutor().submitProcedure(procedure); + latch.await(); + } else { + LOG.warn("Skip switch rpc throttle to {} because it's the same with old value", + rpcThrottle); + } + SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder() + .setPreviousRpcThrottleEnabled(oldRpcThrottle).build(); + masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle); + return response; + } else { + LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle); + return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build(); + } + } + + public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request) + throws IOException { + if (initialized) { + masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled(); + boolean enabled = rpcThrottleStorage.isRpcThrottleEnabled(); + IsRpcThrottleEnabledResponse response = + IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build(); + masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled); + return response; + } else { + LOG.warn("Skip get rpc throttle because rpc quota is disabled"); + return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build(); + } + } + private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) throws IOException, InterruptedException { if (req.hasRemoveAll() && req.getRemoveAll() == true) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 40e70dcadc6..9b3d48aef5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -52,9 +52,14 @@ public class RegionServerRpcQuotaManager { private final RegionServerServices rsServices; private QuotaCache quotaCache = null; + private volatile boolean rpcThrottleEnabled; + // Storage for quota rpc throttle + private RpcThrottleStorage rpcThrottleStorage; public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { this.rsServices = rsServices; + rpcThrottleStorage = + new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); } public void start(final RpcScheduler rpcScheduler) throws IOException { @@ -68,6 +73,8 @@ public class RegionServerRpcQuotaManager { // Initialize quota cache quotaCache = new QuotaCache(rsServices); quotaCache.start(); + rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); + LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); } public void stop() { @@ -76,10 +83,31 @@ public class RegionServerRpcQuotaManager { } } - public boolean isQuotaEnabled() { + @VisibleForTesting + protected boolean isRpcThrottleEnabled() { + return rpcThrottleEnabled; + } + + private boolean isQuotaEnabled() { return quotaCache != null; } + public void switchRpcThrottle(boolean enable) throws IOException { + if (isQuotaEnabled()) { + if (rpcThrottleEnabled != enable) { + boolean previousEnabled = rpcThrottleEnabled; + rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); + LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); + } else { + LOG.warn( + "Skip switch rpc throttle because previous value {} is the same as current value {}", + rpcThrottleEnabled, enable); + } + } else { + LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); + } + } + @VisibleForTesting QuotaCache getQuotaCache() { return quotaCache; @@ -93,7 +121,7 @@ public class RegionServerRpcQuotaManager { * @return the OperationQuota */ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { - if (isQuotaEnabled() && !table.isSystemTable()) { + if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); boolean useNoop = userLimiter.isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottleStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottleStorage.java new file mode 100644 index 00000000000..ba21f6e3f2e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottleStorage.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * ZK based rpc throttle storage. + */ +@InterfaceAudience.Private +public class RpcThrottleStorage { + public static final String RPC_THROTTLE_ZNODE = "zookeeper.znode.quota.rpc.throttle"; + public static final String RPC_THROTTLE_ZNODE_DEFAULT = "rpc-throttle"; + + private final ZKWatcher zookeeper; + private final String rpcThrottleZNode; + + public RpcThrottleStorage(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.rpcThrottleZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode, + conf.get(RPC_THROTTLE_ZNODE, RPC_THROTTLE_ZNODE_DEFAULT)); + } + + public boolean isRpcThrottleEnabled() throws IOException { + try { + byte[] upData = ZKUtil.getData(zookeeper, rpcThrottleZNode); + return upData == null || Bytes.toBoolean(upData); + } catch (KeeperException | InterruptedException e) { + throw new IOException("Failed to get rpc throttle", e); + } + } + + /** + * Store the rpc throttle value. + * @param enable Set to true to enable, false to disable. + * @throws IOException if an unexpected io exception occurs + */ + public void switchRpcThrottle(boolean enable) throws IOException { + try { + byte[] upData = Bytes.toBytes(enable); + ZKUtil.createSetData(zookeeper, rpcThrottleZNode, upData); + } catch (KeeperException e) { + throw new IOException("Failed to store rpc throttle", e); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 13f277b23b9..6e8af1878fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1963,6 +1963,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1)); + this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, + conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java new file mode 100644 index 00000000000..b2e698f0d03 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData; + +/** + * The callable executed at RS side to switch rpc throttle state.
+ */ +@InterfaceAudience.Private +public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable { + private HRegionServer rs; + private boolean rpcThrottleEnabled; + private Exception initError; + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled); + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + try { + SwitchRpcThrottleRemoteStateData param = + SwitchRpcThrottleRemoteStateData.parseFrom(parameter); + rpcThrottleEnabled = param.getRpcThrottleEnabled(); + } catch (InvalidProtocolBufferException e) { + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.M_RS_SWITCH_RPC_THROTTLE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 6e2c9cef8a3..bbf129dab33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -2602,6 +2602,18 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, checkSystemOrSuperUser(getActiveUser(ctx)); } + @Override + public void preSwitchRpcThrottle(ObserverContext ctx, + boolean enable) throws IOException { + requirePermission(ctx, "switchRpcThrottle", Action.ADMIN); + } + + @Override + public void preIsRpcThrottleEnabled(ObserverContext ctx) + throws IOException { + requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN); + } + /** * Returns the active user to which authorization checks should be applied. * If we are in the context of an RPC call, the remote user is used, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java index fc8a0ca5ad7..707cc8750d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; @@ -178,6 +179,14 @@ public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase { assertNumResults(0, null); } + @Test + public void testSwitchRpcThrottle() throws Exception { + CompletableFuture future1 = ASYNC_CONN.getAdmin().switchRpcThrottle(true); + assertEquals(true, future1.get().booleanValue()); + CompletableFuture future2 = ASYNC_CONN.getAdmin().isRpcThrottleEnabled(); + assertEquals(true, future2.get().booleanValue()); + } + private void assertNumResults(int expected, final QuotaFilter filter) throws Exception { assertEquals(expected, countResults(filter)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index 03e0aa590ed..ef7e47d66f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -47,8 +48,10 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -520,6 +523,44 @@ public class TestQuotaAdmin { } + @Test + public void testRpcThrottleWhenStartup() throws IOException, InterruptedException { + TEST_UTIL.getAdmin().switchRpcThrottle(false); + assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled()); + TEST_UTIL.killMiniHBaseCluster(); + + TEST_UTIL.startMiniHBaseCluster(); + assertFalse(TEST_UTIL.getAdmin().isRpcThrottleEnabled()); + for (JVMClusterUtil.RegionServerThread rs : TEST_UTIL.getHBaseCluster() + .getRegionServerThreads()) { + RegionServerRpcQuotaManager quotaManager = + rs.getRegionServer().getRegionServerRpcQuotaManager(); + assertFalse(quotaManager.isRpcThrottleEnabled()); + } + // enable rpc throttle + TEST_UTIL.getAdmin().switchRpcThrottle(true); + assertTrue(TEST_UTIL.getAdmin().isRpcThrottleEnabled()); + } + + @Test + public void testSwitchRpcThrottle() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + testSwitchRpcThrottle(admin, true, true); + testSwitchRpcThrottle(admin, true, false); + testSwitchRpcThrottle(admin, false, false); + testSwitchRpcThrottle(admin, false, true); + } + + private void testSwitchRpcThrottle(Admin admin, boolean oldRpcThrottle, boolean newRpcThrottle) + throws IOException { + boolean state = admin.switchRpcThrottle(newRpcThrottle); + Assert.assertEquals(oldRpcThrottle, state); + Assert.assertEquals(newRpcThrottle, admin.isRpcThrottleEnabled()); + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .forEach(rs -> Assert.assertEquals(newRpcThrottle, + rs.getRegionServer().getRegionServerRpcQuotaManager().isRpcThrottleEnabled())); + } + private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu) throws Exception { // Verify the RPC Quotas in the table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 1b70054b481..2d37f305aea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -3418,6 +3418,32 @@ public class TestAccessController extends SecureTestUtil { } } + @Test + public void testSwitchRpcThrottle() throws Exception { + AccessTestAction action = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preSwitchRpcThrottle(ObserverContextImpl.createAndPrepare(CP_ENV), true); + return null; + } + }; + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + + @Test + public void testIsRpcThrottleEnabled() throws Exception { + AccessTestAction action = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preIsRpcThrottleEnabled(ObserverContextImpl.createAndPrepare(CP_ENV)); + return null; + } + }; + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + /* * Validate Global User ACL */ diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb index 1ba95946eb1..38cb3e334dd 100644 --- a/hbase-shell/src/main/ruby/hbase/quotas.rb +++ b/hbase-shell/src/main/ruby/hbase/quotas.rb @@ -243,6 +243,10 @@ module Hbase QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection) end + def switch_rpc_throttle(enabled) + @admin.switchRpcThrottle(java.lang.Boolean.valueOf(enabled)) + end + def _parse_size(str_limit) str_limit = str_limit.downcase match = /(\d+)([bkmgtp%]*)/.match(str_limit) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 1f7eae6b17b..62a8baef8cf 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -435,6 +435,8 @@ Shell.load_command_group( list_quota_table_sizes list_quota_snapshots list_snapshot_sizes + enable_rpc_throttle + disable_rpc_throttle ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb new file mode 100644 index 00000000000..8ecf6f6d64c --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DisableRpcThrottle < Command + def help + return <<-EOF +Disable quota rpc throttle. Returns previous rpc throttle enabled value. +NOTE: if quota is not enabled, this will not work and always return false. + +Examples: + hbase> disable_rpc_throttle + EOF + end + + def command + prev_state = quotas_admin.switch_rpc_throttle(false) ? 'true' : 'false' + formatter.row(["Previous rpc throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb new file mode 100644 index 00000000000..a68c74064d0 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class EnableRpcThrottle < Command + def help + return <<-EOF +Enable quota rpc throttle. Returns previous rpc throttle enabled value. +NOTE: if quota is not enabled, this will not work and always return false. + +Examples: + hbase> enable_rpc_throttle + EOF + end + + def command + prev_state = quotas_admin.switch_rpc_throttle(true) ? 'true' : 'false' + formatter.row(["Previous rpc throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/quotas_test.rb b/hbase-shell/src/test/ruby/hbase/quotas_test.rb index 295d545fb6e..981001a693d 100644 --- a/hbase-shell/src/test/ruby/hbase/quotas_test.rb +++ b/hbase-shell/src/test/ruby/hbase/quotas_test.rb @@ -162,5 +162,13 @@ module Hbase output = capture_stdout{ command(:list_quotas) } assert(output.include?('0 row(s)')) end + + define_test 'switch rpc throttle' do + output = capture_stdout { command(:disable_rpc_throttle) } + assert(output.include?('Previous rpc throttle state : true')) + + output = capture_stdout { command(:enable_rpc_throttle) } + assert(output.include?('Previous rpc throttle state : false')) + end end end