HBASE-18297 Provide a AsyncAdminBuilder to create new AsyncAdmin instance

This commit is contained in:
Guanghao Zhang 2017-07-05 09:18:02 +08:00
parent 63607800cd
commit e71e5ece88
6 changed files with 441 additions and 19 deletions

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* For creating {@link AsyncAdmin}. The implementation should have default configurations set before
* returning the builder to user. So users are free to only set the configs they care about to
* create a new AsyncAdmin instance.
*/
@InterfaceAudience.Public
public interface AsyncAdminBuilder<T extends AsyncAdmin> {
/**
* Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry
* times) are both limitations for retrying, we will stop retrying when we reach any of the
* limitations.
* @param timeout
* @param unit
* @return this for invocation chaining
*/
AsyncAdminBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each rpc request.
* @param timeout
* @param unit
* @return this for invocation chaining
*/
AsyncAdminBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @param timeout
* @param unit
* @return this for invocation chaining
*/
AsyncAdminBuilder<T> setRetryPause(long timeout, TimeUnit unit);
/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @param maxRetries
* @return this for invocation chaining
*/
default AsyncAdminBuilder<T> setMaxRetries(int maxRetries) {
return setMaxAttempts(retries2Attempts(maxRetries));
}
/**
* Set the max attempt times for an admin operation. Usually it is the max retry times plus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @param maxAttempts
* @return this for invocation chaining
*/
AsyncAdminBuilder<T> setMaxAttempts(int maxAttempts);
/**
* Set the number of retries that are allowed before we start to log.
* @param startLogErrorsCnt
* @return this for invocation chaining
*/
AsyncAdminBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
/**
* Create a {@link AsyncAdmin} instance.
* @return a {@link AsyncAdmin} instance
*/
T build();
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Base class for all asynchronous admin builders.
*/
@InterfaceAudience.Private
abstract class AsyncAdminBuilderBase<T extends AsyncAdmin> implements AsyncAdminBuilder<T> {
protected long rpcTimeoutNs;
protected long operationTimeoutNs;
protected long pauseNs;
protected int maxAttempts;
protected int startLogErrorsCnt;
AsyncAdminBuilderBase(AsyncConnectionConfiguration connConf) {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@Override
public AsyncAdminBuilder<T> setOperationTimeout(long timeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncAdminBuilder<T> setRpcTimeout(long timeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncAdminBuilder<T> setRetryPause(long timeout, TimeUnit unit) {
this.pauseNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncAdminBuilder<T> setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
@Override
public AsyncAdminBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
}

View File

@ -96,17 +96,44 @@ public interface AsyncConnection extends Closeable {
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
/**
* Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned
* {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so
* typically you should not do any time consuming work inside these methods.
* @return an AsyncAdmin instance for cluster administration
* Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster.
* <p>
* The returned instance will use default configs. Use {@link #getAdminBuilder()} if you want to
* customize some configs.
* <p>
* The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc
* framework's callback thread, so typically you should not do any time consuming work inside
* these methods.
* @return an {@link AsyncAdmin} instance for cluster administration
*/
AsyncAdmin getAdmin();
default AsyncAdmin getAdmin() {
return getAdminBuilder().build();
}
/**
* Retrieve an AsyncAdmin implementation to administer an HBase cluster.
* @param pool the thread pool to use for executing callback
* @return an AsyncAdmin instance for cluster administration
* Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}.
* <p>
* The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc
* framework's callback thread, so typically you should not do any time consuming work inside
* these methods.
*/
AsyncAdmin getAdmin(ExecutorService pool);
AsyncAdminBuilder<RawAsyncHBaseAdmin> getAdminBuilder();
/**
* Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster.
* <p>
* The returned instance will use default configs. Use {@link #getAdminBuilder(ExecutorService)}
* if you want to customize some configs.
* @param pool the thread pool to use for executing callback
* @return an {@link AsyncAdmin} instance for cluster administration
*/
default AsyncAdmin getAdmin(ExecutorService pool) {
return getAdminBuilder(pool).build();
}
/**
* Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}.
* @param pool the thread pool to use for executing callback
*/
AsyncAdminBuilder<AsyncHBaseAdmin> getAdminBuilder(ExecutorService pool);
}

View File

@ -278,12 +278,23 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
public AsyncAdmin getAdmin() {
return new RawAsyncHBaseAdmin(this);
public AsyncAdminBuilder<RawAsyncHBaseAdmin> getAdminBuilder() {
return new AsyncAdminBuilderBase<RawAsyncHBaseAdmin>(connConf) {
@Override
public RawAsyncHBaseAdmin build() {
return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this);
}
};
}
@Override
public AsyncAdmin getAdmin(ExecutorService pool) {
return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool);
public AsyncAdminBuilder<AsyncHBaseAdmin> getAdminBuilder(ExecutorService pool) {
return new AsyncAdminBuilderBase<AsyncHBaseAdmin>(connConf) {
@Override
public AsyncHBaseAdmin build() {
RawAsyncHBaseAdmin rawAdmin = new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this);
return new AsyncHBaseAdmin(rawAdmin, pool);
}
};
}
}

View File

@ -212,14 +212,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
private final NonceGenerator ng;
RawAsyncHBaseAdmin(AsyncConnectionImpl connection) {
RawAsyncHBaseAdmin(AsyncConnectionImpl connection, AsyncAdminBuilderBase<?> builder) {
this.connection = connection;
this.metaTable = connection.getRawTable(META_TABLE_NAME);
this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
this.pauseNs = connection.connConf.getPauseNs();
this.maxAttempts = connection.connConf.getMaxRetries();
this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.ng = connection.getNonceGenerator();
}

View File

@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncAdminBuilder {
private static final Log LOG = LogFactory.getLog(TestAsyncAdminBuilder.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncConnection ASYNC_CONN;
@Parameter
public Supplier<AsyncAdminBuilder<?>> getAdminBuilder;
private static AsyncAdminBuilder<RawAsyncHBaseAdmin> getRawAsyncAdminBuilder() {
return ASYNC_CONN.getAdminBuilder();
}
private static AsyncAdminBuilder<AsyncHBaseAdmin> getAsyncAdminBuilder() {
return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder },
new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder });
}
private static final int DEFAULT_RPC_TIMEOUT = 10000;
private static final int DEFAULT_OPERATION_TIMEOUT = 30000;
private static final int DEFAULT_RETRIES_NUMBER = 2;
@Before
public void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
DEFAULT_OPERATION_TIMEOUT);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
DEFAULT_RETRIES_NUMBER);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
}
@After
public void tearDown() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testRpcTimeout() throws Exception {
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
TestRpcTimeoutCoprocessor.class.getName());
TEST_UTIL.startMiniCluster(2);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
try {
getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
fail("We expect an exception here");
} catch (Exception e) {
// expected
}
try {
getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
} catch (Exception e) {
fail("The Operation should succeed, unexpected exception: " + e.getMessage());
}
}
@Test
public void testOperationTimeout() throws Exception {
// set retry number to 100 to make sure that this test only be affected by operation timeout
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
TestOperationTimeoutCoprocessor.class.getName());
TEST_UTIL.startMiniCluster(2);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
try {
getAdminBuilder.get()
.setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
fail("We expect an exception here");
} catch (Exception e) {
// expected
}
try {
getAdminBuilder.get()
.setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
} catch (Exception e) {
fail("The Operation should succeed, unexpected exception: " + e.getMessage());
}
}
@Test
public void testMaxRetries() throws Exception {
// set operation timeout to 300s to make sure that this test only be affected by retry number
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000);
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
TestMaxRetriesCoprocessor.class.getName());
TEST_UTIL.startMiniCluster(2);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
try {
getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
fail("We expect an exception here");
} catch (Exception e) {
// expected
}
try {
getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build()
.getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
} catch (Exception e) {
fail("The Operation should succeed, unexpected exception: " + e.getMessage());
}
}
public static class TestRpcTimeoutCoprocessor implements MasterObserver {
public TestRpcTimeoutCoprocessor() {
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
Threads.sleep(DEFAULT_RPC_TIMEOUT);
}
}
public static class TestOperationTimeoutCoprocessor implements MasterObserver {
AtomicLong sleepTime = new AtomicLong(0);
public TestOperationTimeoutCoprocessor() {
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
throw new IOException("call fail");
}
}
}
public static class TestMaxRetriesCoprocessor implements MasterObserver {
AtomicLong retryNum = new AtomicLong(0);
public TestMaxRetriesCoprocessor() {
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
throw new IOException("call fail");
}
}
}
}