HBASE-17866: Implement async setQuota/getQuota methods

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
huzheng 2017-04-14 14:51:38 +08:00 committed by Guanghao Zhang
parent 7678855fac
commit 8db9760363
5 changed files with 306 additions and 28 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
/** /**
@ -465,4 +468,17 @@ public interface AsyncAdmin {
* startcode. Here is an example: <code> host187.example.com,60020,1289493121758</code> * startcode. Here is an example: <code> host187.example.com,60020,1289493121758</code>
*/ */
CompletableFuture<Void> move(final byte[] regionName, final byte[] destServerName); CompletableFuture<Void> move(final byte[] regionName, final byte[] destServerName);
/**
* Apply the new quota settings.
* @param quota the quota settings
*/
CompletableFuture<Void> setQuota(final QuotaSettings quota);
/**
* List the quotas based on the filter.
* @param filter the quota settings filter
* @return the QuotaSetting list, which wrapped by a CompletableFuture.
*/
CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter);
} }

View File

@ -56,6 +56,9 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques
import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@ -112,6 +115,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineReg
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
@ -1149,6 +1154,48 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return future; return future;
} }
@Override
public CompletableFuture<Void> setQuota(QuotaSettings quota){
return this.<Void> newMasterCaller()
.action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(
controller, stub, QuotaSettings.buildSetQuotaRequestProto(quota),
(s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
.call();
}
@Override
public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
Scan scan = QuotaTableUtil.makeScan(filter);
this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
new RawScanResultConsumer() {
List<QuotaSettings> settings = new ArrayList<>();
@Override
public void onNext(Result[] results, ScanController controller) {
for (Result result : results) {
try {
QuotaTableUtil.parseResultToCollection(result, settings);
} catch (IOException e) {
controller.terminate();
future.completeExceptionally(e);
}
}
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onComplete() {
future.complete(settings);
}
});
return future;
}
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
if (numRegions < 3) { if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions"); throw new IllegalArgumentException("Must create at least three regions");

View File

@ -86,34 +86,10 @@ public class QuotaRetriever implements Closeable, Iterable<QuotaSettings> {
public QuotaSettings next() throws IOException { public QuotaSettings next() throws IOException {
if (cache.isEmpty()) { if (cache.isEmpty()) {
Result result = scanner.next(); Result result = scanner.next();
if (result == null) return null; if (result == null) {
return null;
QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
@Override
public void visitUserQuotas(String userName, Quotas quotas) {
cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
} }
QuotaTableUtil.parseResultToCollection(result, cache);
@Override
public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
}
@Override
public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
}
@Override
public void visitTableQuotas(TableName tableName, Quotas quotas) {
cache.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
}
@Override
public void visitNamespaceQuotas(String namespace, Quotas quotas) {
cache.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
}
});
} }
return cache.poll(); return cache.poll();
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -236,6 +237,37 @@ public class QuotaTableUtil {
} }
} }
public static void parseResultToCollection(final Result result,
Collection<QuotaSettings> quotaSettings) throws IOException {
QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
@Override
public void visitUserQuotas(String userName, Quotas quotas) {
quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
}
@Override
public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
}
@Override
public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
}
@Override
public void visitTableQuotas(TableName tableName, Quotas quotas) {
quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
}
@Override
public void visitNamespaceQuotas(String namespace, Quotas quotas) {
quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
}
});
}
public static void parseNamespaceResult(final Result result, public static void parseNamespaceResult(final Result result,
final NamespaceQuotasVisitor visitor) throws IOException { final NamespaceQuotasVisitor visitor) throws IOException {
String namespace = getNamespaceFromRowKey(result.getRow()); String namespace = getNamespaceFromRowKey(result.getRow());

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.quotas.QuotaCache;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory;
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.ThrottleType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncQuotaAdminApi {
private static final Log LOG = LogFactory.getLog(TestAsyncQuotaAdminApi.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static AsyncConnection ASYNC_CONN;
protected AsyncAdmin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
this.admin = ASYNC_CONN.getAdmin();
}
@Test
public void testThrottleType() throws Exception {
String userName = User.getCurrent().getShortName();
admin.setQuota(
QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES))
.get();
admin.setQuota(
QuotaSettingsFactory.throttleUser(userName, ThrottleType.WRITE_NUMBER, 12, TimeUnit.MINUTES))
.get();
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true)).get();
int countThrottle = 0;
int countGlobalBypass = 0;
for (QuotaSettings settings : admin.getQuota(null).get()) {
switch (settings.getQuotaType()) {
case THROTTLE:
countThrottle++;
break;
case GLOBAL_BYPASS:
countGlobalBypass++;
break;
default:
fail("unexpected settings type: " + settings.getQuotaType());
}
}
assertEquals(2, countThrottle);
assertEquals(1, countGlobalBypass);
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)).get();
assertNumResults(1, null);
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false)).get();
assertNumResults(0, null);
}
@Test
public void testQuotaRetrieverFilter() throws Exception {
TableName[] tables = new TableName[] { TableName.valueOf("T0"), TableName.valueOf("T01"),
TableName.valueOf("NS0:T2"), };
String[] namespaces = new String[] { "NS0", "NS01", "NS2" };
String[] users = new String[] { "User0", "User01", "User2" };
for (String user : users) {
admin.setQuota(
QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 1, TimeUnit.MINUTES))
.get();
for (TableName table : tables) {
admin.setQuota(QuotaSettingsFactory.throttleUser(user, table, ThrottleType.REQUEST_NUMBER,
2, TimeUnit.MINUTES)).get();
}
for (String ns : namespaces) {
admin.setQuota(QuotaSettingsFactory.throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3,
TimeUnit.MINUTES)).get();
}
}
assertNumResults(21, null);
for (TableName table : tables) {
admin.setQuota(
QuotaSettingsFactory.throttleTable(table, ThrottleType.REQUEST_NUMBER, 4, TimeUnit.MINUTES))
.get();
}
assertNumResults(24, null);
for (String ns : namespaces) {
admin.setQuota(QuotaSettingsFactory.throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5,
TimeUnit.MINUTES)).get();
}
assertNumResults(27, null);
assertNumResults(7, new QuotaFilter().setUserFilter("User0"));
assertNumResults(0, new QuotaFilter().setUserFilter("User"));
assertNumResults(21, new QuotaFilter().setUserFilter("User.*"));
assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0"));
assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*"));
assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T"));
assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*"));
assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0"));
assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS"));
assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*"));
assertNumResults(6,
new QuotaFilter().setUserFilter("User.*").setTableFilter("T0").setNamespaceFilter("NS0"));
assertNumResults(1, new QuotaFilter().setTableFilter("T0"));
assertNumResults(0, new QuotaFilter().setTableFilter("T"));
assertNumResults(2, new QuotaFilter().setTableFilter("T.*"));
assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*"));
assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0"));
assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS"));
assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*"));
for (String user : users) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user)).get();
for (TableName table : tables) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table)).get();
}
for (String ns : namespaces) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns)).get();
}
}
assertNumResults(6, null);
for (TableName table : tables) {
admin.setQuota(QuotaSettingsFactory.unthrottleTable(table)).get();
}
assertNumResults(3, null);
for (String ns : namespaces) {
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns)).get();
}
assertNumResults(0, null);
}
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
assertEquals(expected, countResults(filter));
}
private int countResults(final QuotaFilter filter) throws Exception {
int count = 0;
for (QuotaSettings settings : admin.getQuota(filter).get()) {
LOG.debug(settings);
count++;
}
return count;
}
}