diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java new file mode 100644 index 00000000000..d70694973c6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For creating {@link AsyncAdmin}. The implementation should have default configurations set before + * returning the builder to user. So users are free to only set the configs they care about to + * create a new AsyncAdmin instance. + */ +@InterfaceAudience.Public +public interface AsyncAdminBuilder { + + /** + * Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry + * times) are both limitations for retrying, we will stop retrying when we reach any of the + * limitations. + * @param timeout + * @param unit + * @return this for invocation chaining + */ + AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit); + + /** + * Set timeout for each rpc request. + * @param timeout + * @param unit + * @return this for invocation chaining + */ + AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit); + + /** + * Set the base pause time for retrying. We use an exponential policy to generate sleep time when + * retrying. + * @param timeout + * @param unit + * @return this for invocation chaining + */ + AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); + + /** + * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. + * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + * @param maxRetries + * @return this for invocation chaining + */ + default AsyncAdminBuilder setMaxRetries(int maxRetries) { + return setMaxAttempts(retries2Attempts(maxRetries)); + } + + /** + * Set the max attempt times for an admin operation. Usually it is the max retry times plus 1. + * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + * @param maxAttempts + * @return this for invocation chaining + */ + AsyncAdminBuilder setMaxAttempts(int maxAttempts); + + /** + * Set the number of retries that are allowed before we start to log. + * @param startLogErrorsCnt + * @return this for invocation chaining + */ + AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt); + + /** + * Create a {@link AsyncAdmin} instance. + * @return a {@link AsyncAdmin} instance + */ + T build(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java new file mode 100644 index 00000000000..013e8d71e36 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for all asynchronous admin builders. + */ +@InterfaceAudience.Private +abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { + + protected long rpcTimeoutNs; + + protected long operationTimeoutNs; + + protected long pauseNs; + + protected int maxAttempts; + + protected int startLogErrorsCnt; + + AsyncAdminBuilderBase(AsyncConnectionConfiguration connConf) { + this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); + this.operationTimeoutNs = connConf.getOperationTimeoutNs(); + this.pauseNs = connConf.getPauseNs(); + this.maxAttempts = connConf.getMaxRetries(); + this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); + } + + @Override + public AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) { + this.pauseNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncAdminBuilder setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + @Override + public AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 22ed0649907..04ef78e58ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -96,17 +96,44 @@ public interface AsyncConnection extends Closeable { AsyncTableBuilder getTableBuilder(TableName tableName, ExecutorService pool); /** - * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned - * {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so - * typically you should not do any time consuming work inside these methods. - * @return an AsyncAdmin instance for cluster administration + * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster. + *

+ * The returned instance will use default configs. Use {@link #getAdminBuilder()} if you want to + * customize some configs. + *

+ * The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods. + * @return an {@link AsyncAdmin} instance for cluster administration */ - AsyncAdmin getAdmin(); + default AsyncAdmin getAdmin() { + return getAdminBuilder().build(); + } /** - * Retrieve an AsyncAdmin implementation to administer an HBase cluster. - * @param pool the thread pool to use for executing callback - * @return an AsyncAdmin instance for cluster administration + * Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}. + *

+ * The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods. */ - AsyncAdmin getAdmin(ExecutorService pool); + AsyncAdminBuilder getAdminBuilder(); + + /** + * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster. + *

+ * The returned instance will use default configs. Use {@link #getAdminBuilder(ExecutorService)} + * if you want to customize some configs. + * @param pool the thread pool to use for executing callback + * @return an {@link AsyncAdmin} instance for cluster administration + */ + default AsyncAdmin getAdmin(ExecutorService pool) { + return getAdminBuilder(pool).build(); + } + + /** + * Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}. + * @param pool the thread pool to use for executing callback + */ + AsyncAdminBuilder getAdminBuilder(ExecutorService pool); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index c170bce14ed..5dd40cc3446 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -278,12 +278,23 @@ class AsyncConnectionImpl implements AsyncConnection { } @Override - public AsyncAdmin getAdmin() { - return new RawAsyncHBaseAdmin(this); + public AsyncAdminBuilder getAdminBuilder() { + return new AsyncAdminBuilderBase(connConf) { + @Override + public RawAsyncHBaseAdmin build() { + return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this); + } + }; } @Override - public AsyncAdmin getAdmin(ExecutorService pool) { - return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool); + public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { + return new AsyncAdminBuilderBase(connConf) { + @Override + public AsyncHBaseAdmin build() { + RawAsyncHBaseAdmin rawAdmin = new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this); + return new AsyncHBaseAdmin(rawAdmin, pool); + } + }; } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index fcfdf9378a1..179fd7d0e77 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -212,14 +212,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { private final NonceGenerator ng; - RawAsyncHBaseAdmin(AsyncConnectionImpl connection) { + RawAsyncHBaseAdmin(AsyncConnectionImpl connection, AsyncAdminBuilderBase builder) { this.connection = connection; this.metaTable = connection.getRawTable(META_TABLE_NAME); - this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs(); - this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs(); - this.pauseNs = connection.connConf.getPauseNs(); - this.maxAttempts = connection.connConf.getMaxRetries(); - this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt(); + this.rpcTimeoutNs = builder.rpcTimeoutNs; + this.operationTimeoutNs = builder.operationTimeoutNs; + this.pauseNs = builder.pauseNs; + this.maxAttempts = builder.maxAttempts; + this.startLogErrorsCnt = builder.startLogErrorsCnt; this.ng = connection.getNonceGenerator(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java new file mode 100644 index 00000000000..ea25ee4a253 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncAdminBuilder { + + private static final Log LOG = LogFactory.getLog(TestAsyncAdminBuilder.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static AsyncConnection ASYNC_CONN; + + @Parameter + public Supplier> getAdminBuilder; + + private static AsyncAdminBuilder getRawAsyncAdminBuilder() { + return ASYNC_CONN.getAdminBuilder(); + } + + private static AsyncAdminBuilder getAsyncAdminBuilder() { + return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder }, + new Supplier[] { TestAsyncAdminBuilder::getAsyncAdminBuilder }); + } + + private static final int DEFAULT_RPC_TIMEOUT = 10000; + private static final int DEFAULT_OPERATION_TIMEOUT = 30000; + private static final int DEFAULT_RETRIES_NUMBER = 2; + + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + DEFAULT_OPERATION_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + DEFAULT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + } + + @After + public void tearDown() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRpcTimeout() throws Exception { + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + TestRpcTimeoutCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(2); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + + try { + getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + fail("We expect an exception here"); + } catch (Exception e) { + // expected + } + + try { + getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + } catch (Exception e) { + fail("The Operation should succeed, unexpected exception: " + e.getMessage()); + } + } + + @Test + public void testOperationTimeout() throws Exception { + // set retry number to 100 to make sure that this test only be affected by operation timeout + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + TestOperationTimeoutCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(2); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + + try { + getAdminBuilder.get() + .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + fail("We expect an exception here"); + } catch (Exception e) { + // expected + } + + try { + getAdminBuilder.get() + .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + } catch (Exception e) { + fail("The Operation should succeed, unexpected exception: " + e.getMessage()); + } + } + + @Test + public void testMaxRetries() throws Exception { + // set operation timeout to 300s to make sure that this test only be affected by retry number + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000); + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + TestMaxRetriesCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(2); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + + try { + getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + fail("We expect an exception here"); + } catch (Exception e) { + // expected + } + + try { + getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build() + .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); + } catch (Exception e) { + fail("The Operation should succeed, unexpected exception: " + e.getMessage()); + } + } + + public static class TestRpcTimeoutCoprocessor implements MasterObserver { + public TestRpcTimeoutCoprocessor() { + } + + @Override + public void preGetNamespaceDescriptor(ObserverContext ctx, + String namespace) throws IOException { + Threads.sleep(DEFAULT_RPC_TIMEOUT); + } + } + + public static class TestOperationTimeoutCoprocessor implements MasterObserver { + AtomicLong sleepTime = new AtomicLong(0); + + public TestOperationTimeoutCoprocessor() { + } + + @Override + public void preGetNamespaceDescriptor(ObserverContext ctx, + String namespace) throws IOException { + Threads.sleep(DEFAULT_RPC_TIMEOUT / 2); + if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) { + throw new IOException("call fail"); + } + } + } + + public static class TestMaxRetriesCoprocessor implements MasterObserver { + AtomicLong retryNum = new AtomicLong(0); + + public TestMaxRetriesCoprocessor() { + } + + @Override + public void preGetNamespaceDescriptor(ObserverContext ctx, + String namespace) throws IOException { + if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) { + throw new IOException("call fail"); + } + } + } +} \ No newline at end of file