HBASE-16991 Make the initialization of AsyncConnection asynchronous

This commit is contained in:
zhangduo 2017-02-23 18:35:40 +08:00
parent 371f2bd907
commit 62de29e6f2
18 changed files with 131 additions and 126 deletions

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
@ -27,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -79,8 +77,6 @@ class AsyncConnectionImpl implements AsyncConnection {
final AsyncRegistry registry;
private final String clusterId;
private final int rpcTimeout;
private final RpcClient rpcClient;
@ -103,17 +99,12 @@ class AsyncConnectionImpl implements AsyncConnection {
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
new AtomicReference<>();
public AsyncConnectionImpl(Configuration conf, User user) {
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
User user) {
this.conf = conf;
this.user = user;
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = AsyncRegistryFactory.getRegistry(conf);
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
if (LOG.isDebugEnabled()) {
LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT);
}
return CLUSTER_ID_DEFAULT;
});
this.registry = registry;
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
@ -145,11 +136,13 @@ class AsyncConnectionImpl implements AsyncConnection {
}
// we will override this method for testing retry caller, so do not remove this method.
@VisibleForTesting
AsyncRegionLocator getLocator() {
return locator;
}
// ditto
@VisibleForTesting
public NonceGenerator getNonceGenerator() {
return nonceGenerator;
}

View File

@ -28,9 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc..
* All stuffs that may be related to zookeeper at client side are placed here.
* <p>
* Most methods are executed asynchronously except getClusterId. It will be executed synchronously
* and should be called only once when initialization.
* <p>
* Internal use only.
*/
@InterfaceAudience.Private
@ -46,7 +43,7 @@ interface AsyncRegistry extends Closeable {
* <p>
* The upper layer should store this value somewhere as it will not be change any more.
*/
String getClusterId();
CompletableFuture<String> getClusterId();
/**
* Get the number of 'running' regionservers.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
@ -30,13 +31,12 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* A non-instantiable class that manages creation of {@link Connection}s.
* Managing the lifecycle of the {@link Connection}s to the cluster is the responsibility of
* the caller.
* From a {@link Connection}, {@link Table} implementations are retrieved
* with {@link Connection#getTable(TableName)}. Example:
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
* the {@link Connection}s to the cluster is the responsibility of the caller. From a
* {@link Connection}, {@link Table} implementations are retrieved with
* {@link Connection#getTable(TableName)}. Example:
*
* <pre>
* Connection connection = ConnectionFactory.createConnection(config);
* Table table = connection.getTable(TableName.valueOf("table1"));
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
*
* Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
* implementations.
*
* @see Connection
* @since 0.99.0
*/
@ -58,23 +57,20 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
@InterfaceStability.Evolving
public class ConnectionFactory {
public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
"hbase.client.async.connection.impl";
public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
/** No public c.tors */
protected ConnectionFactory() {
}
/**
* Create a new Connection instance using default HBaseConfiguration. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections
* to region servers and masters.
* <br>
* The caller is responsible for calling {@link Connection#close()} on the returned
* connection instance.
* Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
* housekeeping for a connection to the cluster. All tables and interfaces created from returned
* connection share zookeeper connection, meta cache, and connections to region servers and
* masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Typical usage:
* <pre>
* Connection connection = ConnectionFactory.createConnection();
* Table table = connection.getTable(TableName.valueOf("mytable"));
@ -96,13 +92,11 @@ public class ConnectionFactory {
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections
* to region servers and masters.
* <br>
* The caller is responsible for calling {@link Connection#close()} on the returned
* connection instance.
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Typical usage:
* <pre>
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("mytable"));
@ -125,13 +119,11 @@ public class ConnectionFactory {
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections
* to region servers and masters.
* <br>
* The caller is responsible for calling {@link Connection#close()} on the returned
* connection instance.
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Typical usage:
* <pre>
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("mytable"));
@ -156,13 +148,11 @@ public class ConnectionFactory {
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections
* to region servers and masters.
* <br>
* The caller is responsible for calling {@link Connection#close()} on the returned
* connection instance.
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Typical usage:
* <pre>
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("table1"));
@ -179,21 +169,18 @@ public class ConnectionFactory {
* @param user the user the connection is for
* @return Connection object for <code>conf</code>
*/
public static Connection createConnection(Configuration conf, User user)
throws IOException {
public static Connection createConnection(Configuration conf, User user) throws IOException {
return createConnection(conf, null, user);
}
/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections
* to region servers and masters.
* <br>
* The caller is responsible for calling {@link Connection#close()} on the returned
* connection instance.
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* Typical usage:
* <pre>
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("table1"));
@ -212,7 +199,7 @@ public class ConnectionFactory {
* @return Connection object for <code>conf</code>
*/
public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
throws IOException {
if (user == null) {
UserProvider provider = UserProvider.instantiate(conf);
user = provider.getCurrent();
@ -228,9 +215,8 @@ public class ConnectionFactory {
}
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class);
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class);
constructor.setAccessible(true);
return (Connection) constructor.newInstance(conf, pool, user);
} catch (Exception e) {
@ -241,9 +227,9 @@ public class ConnectionFactory {
/**
* Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
* @see #createAsyncConnection(Configuration)
* @return AsyncConnection object
* @return AsyncConnection object wrapped by CompletableFuture
*/
public static AsyncConnection createAsyncConnection() throws IOException {
public static CompletableFuture<AsyncConnection> createAsyncConnection() {
return createAsyncConnection(HBaseConfiguration.create());
}
@ -252,12 +238,20 @@ public class ConnectionFactory {
* User object created by {@link UserProvider}. The given {@code conf} will also be used to
* initialize the {@link UserProvider}.
* @param conf configuration
* @return AsyncConnection object
* @return AsyncConnection object wrapped by CompletableFuture
* @see #createAsyncConnection(Configuration, User)
* @see UserProvider
*/
public static AsyncConnection createAsyncConnection(Configuration conf) throws IOException {
return createAsyncConnection(conf, UserProvider.instantiate(conf).getCurrent());
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
User user;
try {
user = UserProvider.instantiate(conf).getCurrent();
} catch (IOException e) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
return createAsyncConnection(conf, user);
}
/**
@ -273,17 +267,30 @@ public class ConnectionFactory {
* as it is thread safe.
* @param conf configuration
* @param user the user the asynchronous connection is for
* @return AsyncConnection object
* @return AsyncConnection object wrapped by CompletableFuture
* @throws IOException
*/
public static AsyncConnection createAsyncConnection(Configuration conf, User user)
throws IOException {
Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
AsyncConnectionImpl.class, AsyncConnection.class);
try {
return ReflectionUtils.newInstance(clazz, conf, user);
} catch (Exception e) {
throw new IOException(e);
}
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
registry.getClusterId().whenComplete((clusterId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (clusterId == null) {
future.completeExceptionally(new IOException("clusterid came back null"));
return;
}
Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user));
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@ -53,8 +54,7 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.data.Stat;
/**
* Cache the cluster registry data in memory and use zk watcher to update. The only exception is
* {@link #getClusterId()}, it will fetch the data from zk directly.
* Fetch the registry data from zookeeper.
*/
@InterfaceAudience.Private
class ZKAsyncRegistry implements AsyncRegistry {
@ -79,26 +79,6 @@ class ZKAsyncRegistry implements AsyncRegistry {
this.zk.start();
}
@Override
public String getClusterId() {
try {
byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode);
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
return ClusterId.parseFrom(data).toString();
} catch (Exception e) {
LOG.warn("failed to get cluster id", e);
return null;
}
}
@Override
public void close() {
zk.close();
}
private interface CuratorEventProcessor<T> {
T process(CuratorEvent event) throws Exception;
}
@ -120,6 +100,20 @@ class ZKAsyncRegistry implements AsyncRegistry {
return future;
}
private static String getClusterId(CuratorEvent event) throws DeserializationException {
byte[] data = event.getData();
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
return ClusterId.parseFrom(data).toString();
}
@Override
public CompletableFuture<String> getClusterId() {
return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
}
private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
throws IOException {
byte[] data = event.getData();
@ -249,4 +243,9 @@ class ZKAsyncRegistry implements AsyncRegistry {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
}
@Override
public void close() {
zk.close();
}
}

View File

@ -68,7 +68,7 @@ public class TestAsyncAggregationClient {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
UTIL.createTable(TABLE_NAME, CF, splitKeys);
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
TABLE = CONN.getRawTable(TABLE_NAME);
TABLE.putAll(LongStream.range(0, COUNT)
.mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))

View File

@ -61,7 +61,7 @@ public abstract class AbstractTestAsyncTableScan {
}
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))

View File

@ -93,7 +93,7 @@ public class TestAsyncAdmin {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster(1);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass

View File

@ -71,7 +71,9 @@ public class TestAsyncNonMetaRegionLocator {
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {

View File

@ -110,7 +110,9 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);

View File

@ -85,7 +85,9 @@ public class TestAsyncRegionLocatorTimeout {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = new AsyncConnectionImpl(conf, User.getCurrent());
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
LOCATOR = CONN.getLocator();
}

View File

@ -66,7 +66,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
}
@AfterClass
@ -82,8 +84,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
.setMaxRetries(30).build();
RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME)
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
@ -156,14 +158,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
}
};
try (AsyncConnectionImpl mockedConn =
new AsyncConnectionImpl(CONN.getConfiguration(), User.getCurrent()) {
try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
return mockedLocator;
}
}) {
@Override
AsyncRegionLocator getLocator() {
return mockedLocator;
}
}) {
RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME)
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();

View File

@ -99,7 +99,7 @@ public class TestAsyncTable {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass

View File

@ -103,7 +103,7 @@ public class TestAsyncTableBatch {
for (int i = 111; i < 999; i += 111) {
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass

View File

@ -91,7 +91,7 @@ public class TestAsyncTableGetMultiThreaded {
}
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
.setMaxRetries(1000).build();
TABLE.putAll(
@ -119,8 +119,8 @@ public class TestAsyncTableGetMultiThreaded {
public void test() throws IOException, InterruptedException, ExecutionException {
int numThreads = 20;
AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executor =
Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
ExecutorService executor = Executors.newFixedThreadPool(numThreads,
Threads.newDaemonThreadFactory("TestAsyncGet-"));
List<Future<?>> futures = new ArrayList<>();
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
run(stop);

View File

@ -77,13 +77,14 @@ public class TestAsyncTableNoncedRetry {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent()) {
@Override
public NonceGenerator getNonceGenerator() {
return NONCE_GENERATOR;
}
};
}

View File

@ -59,7 +59,7 @@ public class TestAsyncTableScanRenewLease {
SCANNER_LEASE_TIMEOUT_PERIOD_MS);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
TABLE = CONN.getRawTable(TABLE_NAME);
TABLE.putAll(IntStream.range(0, 10).mapToObj(
i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))

View File

@ -53,7 +53,7 @@ public class TestAsyncTableScannerCloseWhileSuspending {
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
TABLE.putAll(IntStream.range(0, 100).mapToObj(
i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))

View File

@ -92,7 +92,7 @@ public class TestZKAsyncRegistry {
@Test
public void test() throws InterruptedException, ExecutionException, IOException {
assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getClusterId(),
REGISTRY.getClusterId());
REGISTRY.getClusterId().get());
assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getServersSize(),
REGISTRY.getCurrentNrHRS().get().intValue());
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),