From 8db9760363890d4d0bfaba25ae6797d45aaf7fec Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 14 Apr 2017 14:51:38 +0800 Subject: [PATCH] HBASE-17866: Implement async setQuota/getQuota methods Signed-off-by: Guanghao Zhang --- .../hadoop/hbase/client/AsyncAdmin.java | 16 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 47 ++++ .../hadoop/hbase/quotas/QuotaRetriever.java | 32 +-- .../hadoop/hbase/quotas/QuotaTableUtil.java | 32 +++ .../hbase/client/TestAsyncQuotaAdminApi.java | 207 ++++++++++++++++++ 5 files changed, 306 insertions(+), 28 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index ab791c252d2..270f28f7dd6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.util.Pair; /** @@ -465,4 +468,17 @@ public interface AsyncAdmin { * startcode. Here is an example: host187.example.com,60020,1289493121758 */ CompletableFuture move(final byte[] regionName, final byte[] destServerName); + + /** + * Apply the new quota settings. + * @param quota the quota settings + */ + CompletableFuture setQuota(final QuotaSettings quota); + + /** + * List the quotas based on the filter. + * @param filter the quota settings filter + * @return the QuotaSetting list, which wrapped by a CompletableFuture. + */ + CompletableFuture> getQuota(QuotaFilter filter); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index e42ee5712a7..180cd19ac52 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -56,6 +56,9 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.quotas.QuotaTableUtil; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -112,6 +115,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineReg import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -1149,6 +1154,48 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public CompletableFuture setQuota(QuotaSettings quota){ + return this. newMasterCaller() + .action((controller, stub) -> this. call( + controller, stub, QuotaSettings.buildSetQuotaRequestProto(quota), + (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) + .call(); + } + + @Override + public CompletableFuture> getQuota(QuotaFilter filter) { + CompletableFuture> future = new CompletableFuture<>(); + Scan scan = QuotaTableUtil.makeScan(filter); + this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, + new RawScanResultConsumer() { + List settings = new ArrayList<>(); + + @Override + public void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + try { + QuotaTableUtil.parseResultToCollection(result, settings); + } catch (IOException e) { + controller.terminate(); + future.completeExceptionally(e); + } + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(settings); + } + }); + return future; + } + private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { if (numRegions < 3) { throw new IllegalArgumentException("Must create at least three regions"); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java index cba6a240818..0f7baa52571 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java @@ -86,34 +86,10 @@ public class QuotaRetriever implements Closeable, Iterable { public QuotaSettings next() throws IOException { if (cache.isEmpty()) { Result result = scanner.next(); - if (result == null) return null; - - QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() { - @Override - public void visitUserQuotas(String userName, Quotas quotas) { - cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas)); - } - - @Override - public void visitUserQuotas(String userName, TableName table, Quotas quotas) { - cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas)); - } - - @Override - public void visitUserQuotas(String userName, String namespace, Quotas quotas) { - cache.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas)); - } - - @Override - public void visitTableQuotas(TableName tableName, Quotas quotas) { - cache.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas)); - } - - @Override - public void visitNamespaceQuotas(String namespace, Quotas quotas) { - cache.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas)); - } - }); + if (result == null) { + return null; + } + QuotaTableUtil.parseResultToCollection(result, cache); } return cache.poll(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 116dd0c65e7..c44090f2b21 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -236,6 +237,37 @@ public class QuotaTableUtil { } } + public static void parseResultToCollection(final Result result, + Collection quotaSettings) throws IOException { + + QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() { + @Override + public void visitUserQuotas(String userName, Quotas quotas) { + quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas)); + } + + @Override + public void visitUserQuotas(String userName, TableName table, Quotas quotas) { + quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas)); + } + + @Override + public void visitUserQuotas(String userName, String namespace, Quotas quotas) { + quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas)); + } + + @Override + public void visitTableQuotas(TableName tableName, Quotas quotas) { + quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas)); + } + + @Override + public void visitNamespaceQuotas(String namespace, Quotas quotas) { + quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas)); + } + }); + } + public static void parseNamespaceResult(final Result result, final NamespaceQuotasVisitor visitor) throws IOException { String namespace = getNamespaceFromRowKey(result.getRow()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java new file mode 100644 index 00000000000..ac9bc16bf7b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.quotas.QuotaCache; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory; +import org.apache.hadoop.hbase.quotas.QuotaTableUtil; +import org.apache.hadoop.hbase.quotas.QuotaUtil; +import org.apache.hadoop.hbase.quotas.ThrottleType; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncQuotaAdminApi { + private static final Log LOG = LogFactory.getLog(TestAsyncQuotaAdminApi.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected static AsyncConnection ASYNC_CONN; + protected AsyncAdmin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = ASYNC_CONN.getAdmin(); + } + + @Test + public void testThrottleType() throws Exception { + String userName = User.getCurrent().getShortName(); + + admin.setQuota( + QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)) + .get(); + admin.setQuota( + QuotaSettingsFactory.throttleUser(userName, ThrottleType.WRITE_NUMBER, 12, TimeUnit.MINUTES)) + .get(); + admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true)).get(); + + int countThrottle = 0; + int countGlobalBypass = 0; + for (QuotaSettings settings : admin.getQuota(null).get()) { + switch (settings.getQuotaType()) { + case THROTTLE: + countThrottle++; + break; + case GLOBAL_BYPASS: + countGlobalBypass++; + break; + default: + fail("unexpected settings type: " + settings.getQuotaType()); + } + } + assertEquals(2, countThrottle); + assertEquals(1, countGlobalBypass); + + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)).get(); + assertNumResults(1, null); + admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false)).get(); + assertNumResults(0, null); + } + + @Test + public void testQuotaRetrieverFilter() throws Exception { + TableName[] tables = new TableName[] { TableName.valueOf("T0"), TableName.valueOf("T01"), + TableName.valueOf("NS0:T2"), }; + String[] namespaces = new String[] { "NS0", "NS01", "NS2" }; + String[] users = new String[] { "User0", "User01", "User2" }; + + for (String user : users) { + admin.setQuota( + QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 1, TimeUnit.MINUTES)) + .get(); + + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.throttleUser(user, table, ThrottleType.REQUEST_NUMBER, + 2, TimeUnit.MINUTES)).get(); + } + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3, + TimeUnit.MINUTES)).get(); + } + } + assertNumResults(21, null); + + for (TableName table : tables) { + admin.setQuota( + QuotaSettingsFactory.throttleTable(table, ThrottleType.REQUEST_NUMBER, 4, TimeUnit.MINUTES)) + .get(); + } + assertNumResults(24, null); + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5, + TimeUnit.MINUTES)).get(); + } + assertNumResults(27, null); + + assertNumResults(7, new QuotaFilter().setUserFilter("User0")); + assertNumResults(0, new QuotaFilter().setUserFilter("User")); + assertNumResults(21, new QuotaFilter().setUserFilter("User.*")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*")); + assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T")); + assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*")); + assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0")); + assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS")); + assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*")); + assertNumResults(6, + new QuotaFilter().setUserFilter("User.*").setTableFilter("T0").setNamespaceFilter("NS0")); + assertNumResults(1, new QuotaFilter().setTableFilter("T0")); + assertNumResults(0, new QuotaFilter().setTableFilter("T")); + assertNumResults(2, new QuotaFilter().setTableFilter("T.*")); + assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*")); + assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0")); + assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS")); + assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*")); + + for (String user : users) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user)).get(); + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table)).get(); + } + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns)).get(); + } + } + assertNumResults(6, null); + + for (TableName table : tables) { + admin.setQuota(QuotaSettingsFactory.unthrottleTable(table)).get(); + } + assertNumResults(3, null); + + for (String ns : namespaces) { + admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns)).get(); + } + assertNumResults(0, null); + } + + private void assertNumResults(int expected, final QuotaFilter filter) throws Exception { + assertEquals(expected, countResults(filter)); + } + + private int countResults(final QuotaFilter filter) throws Exception { + int count = 0; + for (QuotaSettings settings : admin.getQuota(filter).get()) { + LOG.debug(settings); + count++; + } + return count; + } +}