HBASE-21710 Add quota related methods to the Admin interface
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
4e792414f6
commit
ebf4fe3bb9
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotView;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
@ -2770,4 +2771,27 @@ public interface Admin extends Abortable, Closeable {
|
||||||
* @return True if rpc throttle is enabled
|
* @return True if rpc throttle is enabled
|
||||||
*/
|
*/
|
||||||
boolean isRpcThrottleEnabled() throws IOException;
|
boolean isRpcThrottleEnabled() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the table sizes on the filesystem as tracked by the HBase Master.
|
||||||
|
*/
|
||||||
|
Map<TableName, Long> getSpaceQuotaTableSizes() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the observed {@link SpaceQuotaSnapshotView}s observed by a RegionServer.
|
||||||
|
*/
|
||||||
|
Map<TableName, ? extends SpaceQuotaSnapshotView> getRegionServerSpaceQuotaSnapshots(
|
||||||
|
ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Master's view of a quota on the given {@code namespace} or null if the Master has
|
||||||
|
* no quota information on that namespace.
|
||||||
|
*/
|
||||||
|
SpaceQuotaSnapshotView getCurrentSpaceQuotaSnapshot(String namespace) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Master's view of a quota on the given {@code tableName} or null if the Master has
|
||||||
|
* no quota information on that table.
|
||||||
|
*/
|
||||||
|
SpaceQuotaSnapshotView getCurrentSpaceQuotaSnapshot(TableName tableName) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -28,7 +27,6 @@ import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
|
@ -40,6 +38,7 @@ import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotView;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -1269,4 +1268,29 @@ public interface AsyncAdmin {
|
||||||
* @return True if rpc throttle is enabled
|
* @return True if rpc throttle is enabled
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Boolean> isRpcThrottleEnabled();
|
CompletableFuture<Boolean> isRpcThrottleEnabled();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the table sizes on the filesystem as tracked by the HBase Master.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the observed {@link SpaceQuotaSnapshotView}s observed by a RegionServer.
|
||||||
|
*/
|
||||||
|
CompletableFuture<? extends Map<TableName, ? extends SpaceQuotaSnapshotView>>
|
||||||
|
getRegionServerSpaceQuotaSnapshots(ServerName serverName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Master's view of a quota on the given {@code namespace} or null if the Master has
|
||||||
|
* no quota information on that namespace.
|
||||||
|
*/
|
||||||
|
CompletableFuture<? extends SpaceQuotaSnapshotView>
|
||||||
|
getCurrentSpaceQuotaSnapshot(String namespace);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Master's view of a quota on the given {@code tableName} or null if the Master has
|
||||||
|
* no quota information on that table.
|
||||||
|
*/
|
||||||
|
CompletableFuture<? extends SpaceQuotaSnapshotView> getCurrentSpaceQuotaSnapshot(
|
||||||
|
TableName tableName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
|
@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -768,4 +768,25 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
public CompletableFuture<Boolean> isRpcThrottleEnabled() {
|
||||||
return wrap(rawAdmin.isRpcThrottleEnabled());
|
return wrap(rawAdmin.isRpcThrottleEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
|
||||||
|
return wrap(rawAdmin.getSpaceQuotaTableSizes());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
|
||||||
|
ServerName serverName) {
|
||||||
|
return wrap(rawAdmin.getRegionServerSpaceQuotaSnapshots(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
|
||||||
|
return wrap(rawAdmin.getCurrentSpaceQuotaSnapshot(namespace));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
|
||||||
|
return wrap(rawAdmin.getCurrentSpaceQuotaSnapshot(tableName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
@ -106,6 +107,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||||
|
@ -204,6 +206,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMaster
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
|
||||||
|
@ -4340,4 +4347,82 @@ public class HBaseAdmin implements Admin {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TableName, Long> getSpaceQuotaTableSizes() throws IOException {
|
||||||
|
return executeCallable(
|
||||||
|
new MasterCallable<Map<TableName, Long>>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected Map<TableName, Long> rpcCall() throws Exception {
|
||||||
|
GetSpaceQuotaRegionSizesResponse resp = master.getSpaceQuotaRegionSizes(
|
||||||
|
getRpcController(), RequestConverter.buildGetSpaceQuotaRegionSizesRequest());
|
||||||
|
Map<TableName, Long> tableSizes = new HashMap<>();
|
||||||
|
for (RegionSizes sizes : resp.getSizesList()) {
|
||||||
|
TableName tn = ProtobufUtil.toTableName(sizes.getTableName());
|
||||||
|
tableSizes.put(tn, sizes.getSize());
|
||||||
|
}
|
||||||
|
return tableSizes;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TableName, SpaceQuotaSnapshot> getRegionServerSpaceQuotaSnapshots(
|
||||||
|
ServerName serverName) throws IOException {
|
||||||
|
final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
|
||||||
|
Callable<GetSpaceQuotaSnapshotsResponse> callable =
|
||||||
|
new Callable<GetSpaceQuotaSnapshotsResponse>() {
|
||||||
|
@Override
|
||||||
|
public GetSpaceQuotaSnapshotsResponse call() throws Exception {
|
||||||
|
return admin.getSpaceQuotaSnapshots(rpcControllerFactory.newController(),
|
||||||
|
RequestConverter.buildGetSpaceQuotaSnapshotsRequest());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
GetSpaceQuotaSnapshotsResponse resp = ProtobufUtil.call(callable);
|
||||||
|
Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>();
|
||||||
|
for (TableQuotaSnapshot snapshot : resp.getSnapshotsList()) {
|
||||||
|
snapshots.put(ProtobufUtil.toTableName(snapshot.getTableName()),
|
||||||
|
SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()));
|
||||||
|
}
|
||||||
|
return snapshots;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(String namespace) throws IOException {
|
||||||
|
return executeCallable(
|
||||||
|
new MasterCallable<SpaceQuotaSnapshot>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected SpaceQuotaSnapshot rpcCall() throws Exception {
|
||||||
|
GetQuotaStatesResponse resp = master.getQuotaStates(getRpcController(),
|
||||||
|
RequestConverter.buildGetQuotaStatesRequest());
|
||||||
|
for (GetQuotaStatesResponse.NamespaceQuotaSnapshot nsSnapshot : resp
|
||||||
|
.getNsSnapshotsList()) {
|
||||||
|
if (namespace.equals(nsSnapshot.getNamespace())) {
|
||||||
|
return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(nsSnapshot.getSnapshot());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(TableName tableName) throws IOException {
|
||||||
|
return executeCallable(
|
||||||
|
new MasterCallable<SpaceQuotaSnapshot>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected SpaceQuotaSnapshot rpcCall() throws Exception {
|
||||||
|
GetQuotaStatesResponse resp = master.getQuotaStates(getRpcController(),
|
||||||
|
RequestConverter.buildGetQuotaStatesRequest());
|
||||||
|
HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
|
||||||
|
for (GetQuotaStatesResponse.TableQuotaSnapshot tableSnapshot : resp
|
||||||
|
.getTableSnapshotsList()) {
|
||||||
|
if (protoTableName.equals(tableSnapshot.getTableName())) {
|
||||||
|
return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(tableSnapshot.getSnapshot());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,129 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.client;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Client class to wrap RPCs to HBase servers for space quota status information.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class QuotaStatusCalls {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See {@link #getMasterRegionSizes(Connection, RpcControllerFactory, RpcRetryingCallerFactory, int)}
|
|
||||||
*/
|
|
||||||
public static GetSpaceQuotaRegionSizesResponse getMasterRegionSizes(
|
|
||||||
ClusterConnection clusterConn, int timeout) throws IOException {
|
|
||||||
RpcControllerFactory rpcController = clusterConn.getRpcControllerFactory();
|
|
||||||
RpcRetryingCallerFactory rpcCaller = clusterConn.getRpcRetryingCallerFactory();
|
|
||||||
return getMasterRegionSizes(clusterConn, rpcController, rpcCaller, timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes an RPC to the HBase master to fetch its view on the Region sizes.
|
|
||||||
*/
|
|
||||||
public static GetSpaceQuotaRegionSizesResponse getMasterRegionSizes(
|
|
||||||
Connection conn, RpcControllerFactory factory, RpcRetryingCallerFactory rpcCaller,
|
|
||||||
int timeout) throws IOException {
|
|
||||||
MasterCallable<GetSpaceQuotaRegionSizesResponse> callable =
|
|
||||||
new MasterCallable<GetSpaceQuotaRegionSizesResponse>(conn, factory) {
|
|
||||||
@Override
|
|
||||||
protected GetSpaceQuotaRegionSizesResponse rpcCall() throws Exception {
|
|
||||||
return master.getSpaceQuotaRegionSizes(
|
|
||||||
getRpcController(), RequestConverter.buildGetSpaceQuotaRegionSizesRequest());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RpcRetryingCaller<GetSpaceQuotaRegionSizesResponse> caller = rpcCaller.newCaller();
|
|
||||||
try {
|
|
||||||
return caller.callWithoutRetries(callable, timeout);
|
|
||||||
} finally {
|
|
||||||
callable.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See {@link #getMasterQuotaStates(Connection, RpcControllerFactory, RpcRetryingCallerFactory, int)}
|
|
||||||
*/
|
|
||||||
public static GetQuotaStatesResponse getMasterQuotaStates(
|
|
||||||
ClusterConnection clusterConn, int timeout) throws IOException {
|
|
||||||
RpcControllerFactory rpcController = clusterConn.getRpcControllerFactory();
|
|
||||||
RpcRetryingCallerFactory rpcCaller = clusterConn.getRpcRetryingCallerFactory();
|
|
||||||
return getMasterQuotaStates(clusterConn, rpcController, rpcCaller, timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes an RPC tot he HBase master to fetch its view on space quotas.
|
|
||||||
*/
|
|
||||||
public static GetQuotaStatesResponse getMasterQuotaStates(
|
|
||||||
Connection conn, RpcControllerFactory factory, RpcRetryingCallerFactory rpcCaller,
|
|
||||||
int timeout) throws IOException {
|
|
||||||
MasterCallable<GetQuotaStatesResponse> callable =
|
|
||||||
new MasterCallable<GetQuotaStatesResponse>(conn, factory) {
|
|
||||||
@Override
|
|
||||||
protected GetQuotaStatesResponse rpcCall() throws Exception {
|
|
||||||
return master.getQuotaStates(
|
|
||||||
getRpcController(), RequestConverter.buildGetQuotaStatesRequest());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RpcRetryingCaller<GetQuotaStatesResponse> caller = rpcCaller.newCaller();
|
|
||||||
try {
|
|
||||||
return caller.callWithoutRetries(callable, timeout);
|
|
||||||
} finally {
|
|
||||||
callable.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See {@link #getRegionServerQuotaSnapshot(ClusterConnection, RpcControllerFactory, int, ServerName)}
|
|
||||||
*/
|
|
||||||
public static GetSpaceQuotaSnapshotsResponse getRegionServerQuotaSnapshot(
|
|
||||||
ClusterConnection clusterConn, int timeout, ServerName sn) throws IOException {
|
|
||||||
RpcControllerFactory rpcController = clusterConn.getRpcControllerFactory();
|
|
||||||
return getRegionServerQuotaSnapshot(clusterConn, rpcController, timeout, sn);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes an RPC to the RegionServer identified by the {@code ServerName} to fetch its view
|
|
||||||
* on space quotas.
|
|
||||||
*/
|
|
||||||
public static GetSpaceQuotaSnapshotsResponse getRegionServerQuotaSnapshot(
|
|
||||||
ClusterConnection conn, RpcControllerFactory factory,
|
|
||||||
int timeout, ServerName sn) throws IOException {
|
|
||||||
final AdminService.BlockingInterface admin = conn.getAdmin(sn);
|
|
||||||
Callable<GetSpaceQuotaSnapshotsResponse> callable =
|
|
||||||
new Callable<GetSpaceQuotaSnapshotsResponse>() {
|
|
||||||
@Override
|
|
||||||
public GetSpaceQuotaSnapshotsResponse call() throws Exception {
|
|
||||||
return admin.getSpaceQuotaSnapshots(
|
|
||||||
factory.newController(), RequestConverter.buildGetSpaceQuotaSnapshotsRequest());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return ProtobufUtil.call(callable);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
|
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
|
@ -122,6 +123,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerR
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||||
|
@ -251,6 +253,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
||||||
|
@ -3638,4 +3647,56 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
.call();
|
.call();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
|
||||||
|
return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
|
||||||
|
.<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
|
||||||
|
Map<TableName, Long>> call(controller, stub,
|
||||||
|
RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
|
||||||
|
(s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
|
||||||
|
resp -> resp.getSizesList().stream().collect(Collectors
|
||||||
|
.toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
|
||||||
|
.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
|
||||||
|
ServerName serverName) {
|
||||||
|
return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
|
||||||
|
.action((controller, stub) -> this
|
||||||
|
.<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
|
||||||
|
Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
|
||||||
|
RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
|
||||||
|
(s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
|
||||||
|
resp -> resp.getSnapshotsList().stream()
|
||||||
|
.collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
|
||||||
|
snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
|
||||||
|
.serverName(serverName).call();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
|
||||||
|
Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
|
||||||
|
return this.<SpaceQuotaSnapshot> newMasterCaller()
|
||||||
|
.action((controller, stub) -> this
|
||||||
|
.<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
|
||||||
|
RequestConverter.buildGetQuotaStatesRequest(),
|
||||||
|
(s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
|
||||||
|
.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
|
||||||
|
return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
|
||||||
|
.filter(s -> s.getNamespace().equals(namespace)).findFirst()
|
||||||
|
.map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
|
||||||
|
HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
|
||||||
|
return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
|
||||||
|
.filter(s -> s.getTableName().equals(protoTableName)).findFirst()
|
||||||
|
.map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,23 +27,15 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CompareOperator;
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.QuotaStatusCalls;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -55,21 +47,20 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
|
||||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to interact with the quota table.
|
* Helper class to interact with the quota table.
|
||||||
|
@ -561,87 +552,6 @@ public class QuotaTableUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================================================================
|
|
||||||
* Space quota status RPC helpers
|
|
||||||
*/
|
|
||||||
/**
|
|
||||||
* Fetches the table sizes on the filesystem as tracked by the HBase Master.
|
|
||||||
*/
|
|
||||||
public static Map<TableName,Long> getMasterReportedTableSizes(
|
|
||||||
Connection conn) throws IOException {
|
|
||||||
if (!(conn instanceof ClusterConnection)) {
|
|
||||||
throw new IllegalArgumentException("Expected a ClusterConnection");
|
|
||||||
}
|
|
||||||
ClusterConnection clusterConn = (ClusterConnection) conn;
|
|
||||||
GetSpaceQuotaRegionSizesResponse response = QuotaStatusCalls.getMasterRegionSizes(
|
|
||||||
clusterConn, 0);
|
|
||||||
Map<TableName,Long> tableSizes = new HashMap<>();
|
|
||||||
for (RegionSizes sizes : response.getSizesList()) {
|
|
||||||
TableName tn = ProtobufUtil.toTableName(sizes.getTableName());
|
|
||||||
tableSizes.put(tn, sizes.getSize());
|
|
||||||
}
|
|
||||||
return tableSizes;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Fetches the observed {@link SpaceQuotaSnapshot}s observed by a RegionServer.
|
|
||||||
*/
|
|
||||||
public static Map<TableName,SpaceQuotaSnapshot> getRegionServerQuotaSnapshots(
|
|
||||||
Connection conn, ServerName regionServer) throws IOException {
|
|
||||||
if (!(conn instanceof ClusterConnection)) {
|
|
||||||
throw new IllegalArgumentException("Expected a ClusterConnection");
|
|
||||||
}
|
|
||||||
ClusterConnection clusterConn = (ClusterConnection) conn;
|
|
||||||
GetSpaceQuotaSnapshotsResponse response = QuotaStatusCalls.getRegionServerQuotaSnapshot(
|
|
||||||
clusterConn, 0, regionServer);
|
|
||||||
Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
|
|
||||||
for (TableQuotaSnapshot snapshot : response.getSnapshotsList()) {
|
|
||||||
snapshots.put(
|
|
||||||
ProtobufUtil.toTableName(snapshot.getTableName()),
|
|
||||||
SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()));
|
|
||||||
}
|
|
||||||
return snapshots;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the Master's view of a quota on the given {@code tableName} or null if the
|
|
||||||
* Master has no quota information on that table.
|
|
||||||
*/
|
|
||||||
public static SpaceQuotaSnapshot getCurrentSnapshot(
|
|
||||||
Connection conn, TableName tn) throws IOException {
|
|
||||||
if (!(conn instanceof ClusterConnection)) {
|
|
||||||
throw new IllegalArgumentException("Expected a ClusterConnection");
|
|
||||||
}
|
|
||||||
ClusterConnection clusterConn = (ClusterConnection) conn;
|
|
||||||
GetQuotaStatesResponse resp = QuotaStatusCalls.getMasterQuotaStates(clusterConn, 0);
|
|
||||||
HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tn);
|
|
||||||
for (GetQuotaStatesResponse.TableQuotaSnapshot tableSnapshot : resp.getTableSnapshotsList()) {
|
|
||||||
if (protoTableName.equals(tableSnapshot.getTableName())) {
|
|
||||||
return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(tableSnapshot.getSnapshot());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the Master's view of a quota on the given {@code namespace} or null if the
|
|
||||||
* Master has no quota information on that namespace.
|
|
||||||
*/
|
|
||||||
public static SpaceQuotaSnapshot getCurrentSnapshot(
|
|
||||||
Connection conn, String namespace) throws IOException {
|
|
||||||
if (!(conn instanceof ClusterConnection)) {
|
|
||||||
throw new IllegalArgumentException("Expected a ClusterConnection");
|
|
||||||
}
|
|
||||||
ClusterConnection clusterConn = (ClusterConnection) conn;
|
|
||||||
GetQuotaStatesResponse resp = QuotaStatusCalls.getMasterQuotaStates(clusterConn, 0);
|
|
||||||
for (GetQuotaStatesResponse.NamespaceQuotaSnapshot nsSnapshot : resp.getNsSnapshotsList()) {
|
|
||||||
if (namespace.equals(nsSnapshot.getNamespace())) {
|
|
||||||
return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(nsSnapshot.getSnapshot());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* =========================================================================
|
/* =========================================================================
|
||||||
* Quotas protobuf helpers
|
* Quotas protobuf helpers
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package org.apache.hadoop.hbase.quotas;
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
* A point-in-time view of a space quota on a table.
|
* A point-in-time view of a space quota on a table.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SpaceQuotaSnapshot {
|
public class SpaceQuotaSnapshot implements SpaceQuotaSnapshotView {
|
||||||
private static final SpaceQuotaSnapshot NO_SUCH_SNAPSHOT = new SpaceQuotaSnapshot(
|
private static final SpaceQuotaSnapshot NO_SUCH_SNAPSHOT = new SpaceQuotaSnapshot(
|
||||||
SpaceQuotaStatus.notInViolation(), 0, Long.MAX_VALUE);
|
SpaceQuotaStatus.notInViolation(), 0, Long.MAX_VALUE);
|
||||||
private final SpaceQuotaStatus quotaStatus;
|
private final SpaceQuotaStatus quotaStatus;
|
||||||
|
@ -41,26 +41,25 @@ public class SpaceQuotaSnapshot {
|
||||||
* there is guaranteed to be a non-null violation policy.
|
* there is guaranteed to be a non-null violation policy.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static class SpaceQuotaStatus {
|
public static class SpaceQuotaStatus implements SpaceQuotaStatusView {
|
||||||
private static final SpaceQuotaStatus NOT_IN_VIOLATION = new SpaceQuotaStatus(null, false);
|
private static final SpaceQuotaStatus NOT_IN_VIOLATION = new SpaceQuotaStatus(null, false);
|
||||||
final SpaceViolationPolicy policy;
|
final Optional<SpaceViolationPolicy> policy;
|
||||||
final boolean inViolation;
|
final boolean inViolation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a {@code SpaceQuotaSnapshot} which is in violation of the provided {@code policy}.
|
* Constructs a {@code SpaceQuotaSnapshot} which is in violation of the provided {@code policy}.
|
||||||
*
|
* <p/>
|
||||||
* Use {@link #notInViolation()} to obtain an instance of this class for the cases when the
|
* Use {@link #notInViolation()} to obtain an instance of this class for the cases when the
|
||||||
* quota is not in violation.
|
* quota is not in violation.
|
||||||
*
|
|
||||||
* @param policy The non-null policy being violated.
|
* @param policy The non-null policy being violated.
|
||||||
*/
|
*/
|
||||||
public SpaceQuotaStatus(SpaceViolationPolicy policy) {
|
public SpaceQuotaStatus(SpaceViolationPolicy policy) {
|
||||||
// If the caller is instantiating a status, the policy must be non-null
|
// If the caller is instantiating a status, the policy must be non-null
|
||||||
this (Objects.requireNonNull(policy), true);
|
this(Objects.requireNonNull(policy), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SpaceQuotaStatus(SpaceViolationPolicy policy, boolean inViolation) {
|
private SpaceQuotaStatus(SpaceViolationPolicy policy, boolean inViolation) {
|
||||||
this.policy = policy;
|
this.policy = Optional.ofNullable(policy);
|
||||||
this.inViolation = inViolation;
|
this.inViolation = inViolation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,13 +67,15 @@ public class SpaceQuotaSnapshot {
|
||||||
* Returns the violation policy, which may be null. It is guaranteed to be non-null if
|
* Returns the violation policy, which may be null. It is guaranteed to be non-null if
|
||||||
* {@link #isInViolation()} is {@code true}, but may be null otherwise.
|
* {@link #isInViolation()} is {@code true}, but may be null otherwise.
|
||||||
*/
|
*/
|
||||||
public SpaceViolationPolicy getPolicy() {
|
@Override
|
||||||
|
public Optional<SpaceViolationPolicy> getPolicy() {
|
||||||
return policy;
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return {@code true} if the quota is being violated, {@code false} otherwise.
|
* @return {@code true} if the quota is being violated, {@code false} otherwise.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean isInViolation() {
|
public boolean isInViolation() {
|
||||||
return inViolation;
|
return inViolation;
|
||||||
}
|
}
|
||||||
|
@ -113,7 +114,7 @@ public class SpaceQuotaSnapshot {
|
||||||
QuotaProtos.SpaceQuotaStatus.Builder builder = QuotaProtos.SpaceQuotaStatus.newBuilder();
|
QuotaProtos.SpaceQuotaStatus.Builder builder = QuotaProtos.SpaceQuotaStatus.newBuilder();
|
||||||
builder.setInViolation(status.inViolation);
|
builder.setInViolation(status.inViolation);
|
||||||
if (status.isInViolation()) {
|
if (status.isInViolation()) {
|
||||||
builder.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(status.getPolicy()));
|
builder.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(status.getPolicy().get()));
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
@ -136,6 +137,7 @@ public class SpaceQuotaSnapshot {
|
||||||
/**
|
/**
|
||||||
* Returns the status of the quota.
|
* Returns the status of the quota.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public SpaceQuotaStatus getQuotaStatus() {
|
public SpaceQuotaStatus getQuotaStatus() {
|
||||||
return quotaStatus;
|
return quotaStatus;
|
||||||
}
|
}
|
||||||
|
@ -143,6 +145,7 @@ public class SpaceQuotaSnapshot {
|
||||||
/**
|
/**
|
||||||
* Returns the current usage, in bytes, of the target (e.g. table, namespace).
|
* Returns the current usage, in bytes, of the target (e.g. table, namespace).
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long getUsage() {
|
public long getUsage() {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
@ -150,6 +153,7 @@ public class SpaceQuotaSnapshot {
|
||||||
/**
|
/**
|
||||||
* Returns the limit, in bytes, of the target (e.g. table, namespace).
|
* Returns the limit, in bytes, of the target (e.g. table, namespace).
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long getLimit() {
|
public long getLimit() {
|
||||||
return limit;
|
return limit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A point-in-time view of a space quota on a table, read only.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public interface SpaceQuotaSnapshotView {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates the state of a quota on a table. The quota may or may not be in violation. If the
|
||||||
|
* quota is not in violation, the violation may not be presented. If the quota is in violation,
|
||||||
|
* there is guaranteed to be presented.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
interface SpaceQuotaStatusView {
|
||||||
|
/**
|
||||||
|
* Returns the violation policy, which may not be presented. It is guaranteed to be presented if
|
||||||
|
* {@link #isInViolation()} is {@code true}, but may not be presented otherwise.
|
||||||
|
*/
|
||||||
|
Optional<SpaceViolationPolicy> getPolicy();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return {@code true} if the quota is being violated, {@code false} otherwise.
|
||||||
|
*/
|
||||||
|
boolean isInViolation();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the status of the quota.
|
||||||
|
*/
|
||||||
|
SpaceQuotaStatusView getQuotaStatus();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current usage, in bytes, of the target (e.g. table, namespace).
|
||||||
|
*/
|
||||||
|
long getUsage();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the limit, in bytes, of the target (e.g. table, namespace).
|
||||||
|
*/
|
||||||
|
long getLimit();
|
||||||
|
}
|
|
@ -58,7 +58,7 @@ public class SpaceViolationPolicyEnforcementFactory {
|
||||||
if (!status.isInViolation()) {
|
if (!status.isInViolation()) {
|
||||||
throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot);
|
throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot);
|
||||||
}
|
}
|
||||||
switch (status.getPolicy()) {
|
switch (status.getPolicy().get()) {
|
||||||
case DISABLE:
|
case DISABLE:
|
||||||
enforcement = new DisableTableViolationPolicyEnforcement();
|
enforcement = new DisableTableViolationPolicyEnforcement();
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -298,10 +298,11 @@ if ( fqtn != null ) {
|
||||||
if (quota == null || !quota.hasSpace()) {
|
if (quota == null || !quota.hasSpace()) {
|
||||||
quota = QuotaTableUtil.getNamespaceQuota(master.getConnection(), tn.getNamespaceAsString());
|
quota = QuotaTableUtil.getNamespaceQuota(master.getConnection(), tn.getNamespaceAsString());
|
||||||
if (quota != null) {
|
if (quota != null) {
|
||||||
masterSnapshot = QuotaTableUtil.getCurrentSnapshot(master.getConnection(), tn.getNamespaceAsString());
|
masterSnapshot = master.getQuotaObserverChore().getNamespaceQuotaSnapshots()
|
||||||
|
.get(tn.getNamespaceAsString());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
masterSnapshot = QuotaTableUtil.getCurrentSnapshot(master.getConnection(), tn);
|
masterSnapshot = master.getQuotaObserverChore().getTableQuotaSnapshots().get(tn);
|
||||||
}
|
}
|
||||||
if (quota != null && quota.hasSpace()) {
|
if (quota != null && quota.hasSpace()) {
|
||||||
SpaceQuota spaceQuota = quota.getSpace();
|
SpaceQuota spaceQuota = quota.getSpace();
|
||||||
|
|
|
@ -397,9 +397,9 @@ public class SpaceQuotaHelperForTests {
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
SpaceQuotaSnapshot snapshot;
|
SpaceQuotaSnapshot snapshot;
|
||||||
if (null == ns) {
|
if (null == ns) {
|
||||||
snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn);
|
snapshot = (SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn);
|
||||||
} else {
|
} else {
|
||||||
snapshot = QuotaTableUtil.getCurrentSnapshot(conn, ns);
|
snapshot = (SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(ns);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Saw quota snapshot for " + (null == tn ? ns : tn) + ": " + snapshot);
|
LOG.debug("Saw quota snapshot for " + (null == tn ? ns : tn) + ": " + snapshot);
|
||||||
|
|
|
@ -154,8 +154,8 @@ public class TestNamespaceQuotaViolationStore {
|
||||||
|
|
||||||
// Exceeds the quota, should be in violation
|
// Exceeds the quota, should be in violation
|
||||||
assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
|
assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation());
|
||||||
assertEquals(
|
assertEquals(SpaceViolationPolicy.DISABLE,
|
||||||
SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy());
|
store.getTargetState(NS, quota).getQuotaStatus().getPolicy().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -154,14 +154,15 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
|
Entry<TableName, SpaceQuotaSnapshot> entry =
|
||||||
|
Iterables.getOnlyElement(quotaSnapshots.entrySet());
|
||||||
assertEquals(tn, entry.getKey());
|
assertEquals(tn, entry.getKey());
|
||||||
final SpaceQuotaSnapshot snapshot = entry.getValue();
|
final SpaceQuotaSnapshot snapshot = entry.getValue();
|
||||||
assertEquals("Snapshot was " + snapshot, violationPolicy, snapshot.getQuotaStatus().getPolicy());
|
assertEquals("Snapshot was " + snapshot, violationPolicy,
|
||||||
|
snapshot.getQuotaStatus().getPolicy().get());
|
||||||
assertEquals(sizeLimit, snapshot.getLimit());
|
assertEquals(sizeLimit, snapshot.getLimit());
|
||||||
assertTrue(
|
assertTrue("The usage should be greater than the limit, but were " + snapshot.getUsage() +
|
||||||
"The usage should be greater than the limit, but were " + snapshot.getUsage() + " and "
|
" and " + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit());
|
||||||
+ snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -235,13 +236,13 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
||||||
|
|
||||||
SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
|
SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
|
||||||
assertNotNull("tn1 should be in violation", snapshot1);
|
assertNotNull("tn1 should be in violation", snapshot1);
|
||||||
assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy());
|
assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get());
|
||||||
SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
|
SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
|
||||||
assertNotNull("tn2 should be in violation", snapshot2);
|
assertNotNull("tn2 should be in violation", snapshot2);
|
||||||
assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy());
|
assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get());
|
||||||
SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
|
SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
|
||||||
assertNotNull("tn3 should be in violation", snapshot3);
|
assertNotNull("tn3 should be in violation", snapshot3);
|
||||||
assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy());
|
assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get());
|
||||||
assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
|
assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,10 +299,10 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
||||||
|
|
||||||
SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
|
SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
|
||||||
assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
|
assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
|
||||||
assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy());
|
assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get());
|
||||||
SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
|
SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
|
||||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
|
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
|
||||||
|
|
||||||
// Override the namespace quota with a table quota
|
// Override the namespace quota with a table quota
|
||||||
final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
|
||||||
|
@ -315,7 +316,7 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
||||||
snapshots = snapshotNotifier.copySnapshots();
|
snapshots = snapshotNotifier.copySnapshots();
|
||||||
SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
|
SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
|
||||||
assertNotNull("Violation policy should never be null", actualTableSnapshot);
|
assertNotNull("Violation policy should never be null", actualTableSnapshot);
|
||||||
if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) {
|
if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) {
|
||||||
LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
|
LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
|
||||||
try {
|
try {
|
||||||
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
Thread.sleep(DEFAULT_WAIT_MILLIS);
|
||||||
|
@ -325,14 +326,14 @@ public class TestQuotaObserverChoreWithMiniCluster {
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy());
|
assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should not change with the introduction of the table quota for tn1
|
// This should not change with the introduction of the table quota for tn1
|
||||||
actualPolicyTN2 = snapshots.get(tn2);
|
actualPolicyTN2 = snapshots.get(tn2);
|
||||||
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
|
||||||
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
|
assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class TestQuotaStatusRPCs {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Map<TableName,Long> sizes = QuotaTableUtil.getMasterReportedTableSizes(TEST_UTIL.getConnection());
|
Map<TableName, Long> sizes = TEST_UTIL.getAdmin().getSpaceQuotaTableSizes();
|
||||||
Long size = sizes.get(tn);
|
Long size = sizes.get(tn);
|
||||||
assertNotNull("No reported size for " + tn, size);
|
assertNotNull("No reported size for " + tn, size);
|
||||||
assertTrue("Reported table size was " + size, size.longValue() >= tableSize);
|
assertTrue("Reported table size was " + size, size.longValue() >= tableSize);
|
||||||
|
@ -142,8 +142,9 @@ public class TestQuotaStatusRPCs {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Map<TableName, SpaceQuotaSnapshot> snapshots = QuotaTableUtil.getRegionServerQuotaSnapshots(
|
@SuppressWarnings("unchecked")
|
||||||
TEST_UTIL.getConnection(), rs.getServerName());
|
Map<TableName, SpaceQuotaSnapshot> snapshots = (Map<TableName, SpaceQuotaSnapshot>) TEST_UTIL
|
||||||
|
.getAdmin().getRegionServerSpaceQuotaSnapshots(rs.getServerName());
|
||||||
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
|
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
|
||||||
assertNotNull("Did not find snapshot for " + tn, snapshot);
|
assertNotNull("Did not find snapshot for " + tn, snapshot);
|
||||||
assertTrue(
|
assertTrue(
|
||||||
|
@ -189,12 +190,13 @@ public class TestQuotaStatusRPCs {
|
||||||
});
|
});
|
||||||
|
|
||||||
// We obtain the violations for a RegionServer by observing the snapshots
|
// We obtain the violations for a RegionServer by observing the snapshots
|
||||||
Map<TableName,SpaceQuotaSnapshot> snapshots =
|
@SuppressWarnings("unchecked")
|
||||||
QuotaTableUtil.getRegionServerQuotaSnapshots(TEST_UTIL.getConnection(), rs.getServerName());
|
Map<TableName, SpaceQuotaSnapshot> snapshots = (Map<TableName, SpaceQuotaSnapshot>) TEST_UTIL
|
||||||
|
.getAdmin().getRegionServerSpaceQuotaSnapshots(rs.getServerName());
|
||||||
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
|
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
|
||||||
assertNotNull("Did not find snapshot for " + tn, snapshot);
|
assertNotNull("Did not find snapshot for " + tn, snapshot);
|
||||||
assertTrue(snapshot.getQuotaStatus().isInViolation());
|
assertTrue(snapshot.getQuotaStatus().isInViolation());
|
||||||
assertEquals(SpaceViolationPolicy.NO_INSERTS, snapshot.getQuotaStatus().getPolicy());
|
assertEquals(SpaceViolationPolicy.NO_INSERTS, snapshot.getQuotaStatus().getPolicy().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -224,7 +226,8 @@ public class TestQuotaStatusRPCs {
|
||||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn);
|
SpaceQuotaSnapshot snapshot =
|
||||||
|
(SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn);
|
||||||
LOG.info("Table snapshot after initial ingest: " + snapshot);
|
LOG.info("Table snapshot after initial ingest: " + snapshot);
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -237,8 +240,8 @@ public class TestQuotaStatusRPCs {
|
||||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000 * 1000, new Predicate<Exception>() {
|
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000 * 1000, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(
|
SpaceQuotaSnapshot snapshot = (SpaceQuotaSnapshot) conn.getAdmin()
|
||||||
conn, tn.getNamespaceAsString());
|
.getCurrentSpaceQuotaSnapshot(tn.getNamespaceAsString());
|
||||||
LOG.debug("Namespace snapshot after initial ingest: " + snapshot);
|
LOG.debug("Namespace snapshot after initial ingest: " + snapshot);
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -250,7 +253,8 @@ public class TestQuotaStatusRPCs {
|
||||||
|
|
||||||
// Sanity check: the below assertions will fail if we somehow write too much data
|
// Sanity check: the below assertions will fail if we somehow write too much data
|
||||||
// and force the table to move into violation before we write the second bit of data.
|
// and force the table to move into violation before we write the second bit of data.
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn);
|
SpaceQuotaSnapshot snapshot =
|
||||||
|
(SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn);
|
||||||
assertTrue("QuotaSnapshot for " + tn + " should be non-null and not in violation",
|
assertTrue("QuotaSnapshot for " + tn + " should be non-null and not in violation",
|
||||||
snapshot != null && !snapshot.getQuotaStatus().isInViolation());
|
snapshot != null && !snapshot.getQuotaStatus().isInViolation());
|
||||||
|
|
||||||
|
@ -264,7 +268,8 @@ public class TestQuotaStatusRPCs {
|
||||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn);
|
SpaceQuotaSnapshot snapshot =
|
||||||
|
(SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn);
|
||||||
LOG.info("Table snapshot after second ingest: " + snapshot);
|
LOG.info("Table snapshot after second ingest: " + snapshot);
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -276,8 +281,8 @@ public class TestQuotaStatusRPCs {
|
||||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(
|
SpaceQuotaSnapshot snapshot = (SpaceQuotaSnapshot) conn.getAdmin()
|
||||||
conn, tn.getNamespaceAsString());
|
.getCurrentSpaceQuotaSnapshot(tn.getNamespaceAsString());
|
||||||
LOG.debug("Namespace snapshot after second ingest: " + snapshot);
|
LOG.debug("Namespace snapshot after second ingest: " + snapshot);
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -122,7 +122,7 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
waitForStableQuotaSize(conn, tn, null);
|
waitForStableQuotaSize(conn, tn, null);
|
||||||
|
|
||||||
// The actual size on disk after we wrote our data the first time
|
// The actual size on disk after we wrote our data the first time
|
||||||
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage();
|
final long actualInitialSize = conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn).getUsage();
|
||||||
LOG.info("Initial table size was " + actualInitialSize);
|
LOG.info("Initial table size was " + actualInitialSize);
|
||||||
|
|
||||||
LOG.info("Snapshot the table");
|
LOG.info("Snapshot the table");
|
||||||
|
@ -217,7 +217,7 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
waitForStableQuotaSize(conn, null, ns);
|
waitForStableQuotaSize(conn, null, ns);
|
||||||
|
|
||||||
// The actual size on disk after we wrote our data the first time
|
// The actual size on disk after we wrote our data the first time
|
||||||
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, ns).getUsage();
|
final long actualInitialSize = conn.getAdmin().getCurrentSpaceQuotaSnapshot(ns).getUsage();
|
||||||
LOG.info("Initial table size was " + actualInitialSize);
|
LOG.info("Initial table size was " + actualInitialSize);
|
||||||
|
|
||||||
LOG.info("Snapshot the table");
|
LOG.info("Snapshot the table");
|
||||||
|
@ -241,7 +241,7 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
TEST_UTIL.waitFor(30 * 1000, 500, new Predicate<Exception>() {
|
TEST_UTIL.waitFor(30 * 1000, 500, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
Map<TableName,Long> sizes = QuotaTableUtil.getMasterReportedTableSizes(conn);
|
Map<TableName, Long> sizes = conn.getAdmin().getSpaceQuotaTableSizes();
|
||||||
LOG.debug("Master observed table sizes from region size reports: " + sizes);
|
LOG.debug("Master observed table sizes from region size reports: " + sizes);
|
||||||
Long size = sizes.get(tn);
|
Long size = sizes.get(tn);
|
||||||
if (null == size) {
|
if (null == size) {
|
||||||
|
@ -374,7 +374,7 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
waitForStableQuotaSize(conn, tn, null);
|
waitForStableQuotaSize(conn, tn, null);
|
||||||
|
|
||||||
// The actual size on disk after we wrote our data the first time
|
// The actual size on disk after we wrote our data the first time
|
||||||
final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage();
|
final long actualInitialSize = conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn).getUsage();
|
||||||
LOG.info("Initial table size was " + actualInitialSize);
|
LOG.info("Initial table size was " + actualInitialSize);
|
||||||
|
|
||||||
LOG.info("Snapshot the table");
|
LOG.info("Snapshot the table");
|
||||||
|
@ -397,7 +397,8 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
});
|
});
|
||||||
|
|
||||||
// We know that reports were sent by our RS, verify that they take up zero size.
|
// We know that reports were sent by our RS, verify that they take up zero size.
|
||||||
SpaceQuotaSnapshot snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn2);
|
SpaceQuotaSnapshot snapshot =
|
||||||
|
(SpaceQuotaSnapshot) conn.getAdmin().getCurrentSpaceQuotaSnapshot(tn2);
|
||||||
assertNotNull(snapshot);
|
assertNotNull(snapshot);
|
||||||
assertEquals(0, snapshot.getUsage());
|
assertEquals(0, snapshot.getUsage());
|
||||||
|
|
||||||
|
@ -436,7 +437,7 @@ public class TestSpaceQuotasWithSnapshots {
|
||||||
}
|
}
|
||||||
|
|
||||||
long getRegionSizeReportForTable(Connection conn, TableName tn) throws IOException {
|
long getRegionSizeReportForTable(Connection conn, TableName tn) throws IOException {
|
||||||
Map<TableName,Long> sizes = QuotaTableUtil.getMasterReportedTableSizes(conn);
|
Map<TableName, Long> sizes = conn.getAdmin().getSpaceQuotaTableSizes();
|
||||||
Long value = sizes.get(tn);
|
Long value = sizes.get(tn);
|
||||||
if (null == value) {
|
if (null == value) {
|
||||||
return 0L;
|
return 0L;
|
||||||
|
|
|
@ -175,7 +175,7 @@ module Hbase
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_master_table_sizes
|
def get_master_table_sizes
|
||||||
QuotaTableUtil.getMasterReportedTableSizes(@admin.getConnection)
|
@admin.getSpaceQuotaTableSizes
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_quota_snapshots(regionserver = nil)
|
def get_quota_snapshots(regionserver = nil)
|
||||||
|
@ -192,8 +192,7 @@ module Hbase
|
||||||
|
|
||||||
def get_rs_quota_snapshots(rs)
|
def get_rs_quota_snapshots(rs)
|
||||||
# Reads the snapshots from a specific regionserver
|
# Reads the snapshots from a specific regionserver
|
||||||
QuotaTableUtil.getRegionServerQuotaSnapshots(@admin.getConnection,
|
@admin.getRegionServerSpaceQuotaSnapshots(ServerName.valueOf(rs))
|
||||||
ServerName.valueOf(rs))
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def set_global_bypass(bypass, args)
|
def set_global_bypass(bypass, args)
|
||||||
|
|
|
@ -65,7 +65,7 @@ EOF
|
||||||
def get_policy(status)
|
def get_policy(status)
|
||||||
# Unwrap the violation policy if it exists
|
# Unwrap the violation policy if it exists
|
||||||
if status.isInViolation
|
if status.isInViolation
|
||||||
status.getPolicy.name
|
status.getPolicy.get.name
|
||||||
else
|
else
|
||||||
'None'
|
'None'
|
||||||
end
|
end
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
|
@ -1386,4 +1387,26 @@ public class ThriftAdmin implements Admin {
|
||||||
public Future<Void> deleteNamespaceAsync(String name) {
|
public Future<Void> deleteNamespaceAsync(String name) {
|
||||||
throw new NotImplementedException("deleteNamespaceAsync not supported in ThriftAdmin");
|
throw new NotImplementedException("deleteNamespaceAsync not supported in ThriftAdmin");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TableName, Long> getSpaceQuotaTableSizes() throws IOException {
|
||||||
|
throw new NotImplementedException("getSpaceQuotaTableSizes not supported in ThriftAdmin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TableName, SpaceQuotaSnapshot> getRegionServerSpaceQuotaSnapshots(
|
||||||
|
ServerName serverName) throws IOException {
|
||||||
|
throw new NotImplementedException(
|
||||||
|
"getRegionServerSpaceQuotaSnapshots not supported in ThriftAdmin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(String namespace) throws IOException {
|
||||||
|
throw new NotImplementedException("getCurrentSpaceQuotaSnapshot not supported in ThriftAdmin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(TableName tableName) throws IOException {
|
||||||
|
throw new NotImplementedException("getCurrentSpaceQuotaSnapshot not supported in ThriftAdmin");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue