diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index b14fb8214d7..776498acabc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; @@ -27,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -79,8 +77,6 @@ class AsyncConnectionImpl implements AsyncConnection { final AsyncRegistry registry; - private final String clusterId; - private final int rpcTimeout; private final RpcClient rpcClient; @@ -103,17 +99,12 @@ class AsyncConnectionImpl implements AsyncConnection { private final AtomicReference> masterStubMakeFuture = new AtomicReference<>(); - public AsyncConnectionImpl(Configuration conf, User user) { + public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, + User user) { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); - this.registry = AsyncRegistryFactory.getRegistry(conf); - this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { - if (LOG.isDebugEnabled()) { - LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT); - } - return CLUSTER_ID_DEFAULT; - }); + this.registry = registry; this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); @@ -145,11 +136,13 @@ class AsyncConnectionImpl implements AsyncConnection { } // we will override this method for testing retry caller, so do not remove this method. + @VisibleForTesting AsyncRegionLocator getLocator() { return locator; } // ditto + @VisibleForTesting public NonceGenerator getNonceGenerator() { return nonceGenerator; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 731cf09b6db..45700439448 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -28,9 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. * All stuffs that may be related to zookeeper at client side are placed here. *

- * Most methods are executed asynchronously except getClusterId. It will be executed synchronously - * and should be called only once when initialization. - *

* Internal use only. */ @InterfaceAudience.Private @@ -46,7 +43,7 @@ interface AsyncRegistry extends Closeable { *

* The upper layer should store this value somewhere as it will not be change any more. */ - String getClusterId(); + CompletableFuture getClusterId(); /** * Get the number of 'running' regionservers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index ca34211d1df..7cbcc20cd66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Constructor; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -30,13 +31,12 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.ReflectionUtils; - /** - * A non-instantiable class that manages creation of {@link Connection}s. - * Managing the lifecycle of the {@link Connection}s to the cluster is the responsibility of - * the caller. - * From a {@link Connection}, {@link Table} implementations are retrieved - * with {@link Connection#getTable(TableName)}. Example: + * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of + * the {@link Connection}s to the cluster is the responsibility of the caller. From a + * {@link Connection}, {@link Table} implementations are retrieved with + * {@link Connection#getTable(TableName)}. Example: + * *

  * Connection connection = ConnectionFactory.createConnection(config);
  * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  *
  * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
  * implementations.
- *
  * @see Connection
  * @since 0.99.0
  */
@@ -58,23 +57,20 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceStability.Evolving
 public class ConnectionFactory {
 
-  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
-      "hbase.client.async.connection.impl";
+  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
 
   /** No public c.tors */
   protected ConnectionFactory() {
   }
 
   /**
-   * Create a new Connection instance using default HBaseConfiguration. Connection
-   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all + * housekeeping for a connection to the cluster. All tables and interfaces created from returned + * connection share zookeeper connection, meta cache, and connections to region servers and + * masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection();
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -96,13 +92,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -125,13 +119,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -156,13 +148,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -179,21 +169,18 @@ public class ConnectionFactory {
    * @param user the user the connection is for
    * @return Connection object for conf
    */
-  public static Connection createConnection(Configuration conf, User user)
-  throws IOException {
+  public static Connection createConnection(Configuration conf, User user) throws IOException {
     return createConnection(conf, null, user);
   }
 
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -212,7 +199,7 @@ public class ConnectionFactory {
    * @return Connection object for conf
    */
   public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
-  throws IOException {
+      throws IOException {
     if (user == null) {
       UserProvider provider = UserProvider.instantiate(conf);
       user = provider.getCurrent();
@@ -228,9 +215,8 @@ public class ConnectionFactory {
     }
     try {
       // Default HCM#HCI is not accessible; make it so before invoking.
-      Constructor constructor =
-        clazz.getDeclaredConstructor(Configuration.class,
-          ExecutorService.class, User.class);
+      Constructor constructor = clazz.getDeclaredConstructor(Configuration.class,
+        ExecutorService.class, User.class);
       constructor.setAccessible(true);
       return (Connection) constructor.newInstance(conf, pool, user);
     } catch (Exception e) {
@@ -241,9 +227,9 @@ public class ConnectionFactory {
   /**
    * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
    * @see #createAsyncConnection(Configuration)
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    */
-  public static AsyncConnection createAsyncConnection() throws IOException {
+  public static CompletableFuture createAsyncConnection() {
     return createAsyncConnection(HBaseConfiguration.create());
   }
 
@@ -252,12 +238,20 @@ public class ConnectionFactory {
    * User object created by {@link UserProvider}. The given {@code conf} will also be used to
    * initialize the {@link UserProvider}.
    * @param conf configuration
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @see #createAsyncConnection(Configuration, User)
    * @see UserProvider
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf) throws IOException {
-    return createAsyncConnection(conf, UserProvider.instantiate(conf).getCurrent());
+  public static CompletableFuture createAsyncConnection(Configuration conf) {
+    User user;
+    try {
+      user = UserProvider.instantiate(conf).getCurrent();
+    } catch (IOException e) {
+      CompletableFuture future = new CompletableFuture<>();
+      future.completeExceptionally(e);
+      return future;
+    }
+    return createAsyncConnection(conf, user);
   }
 
   /**
@@ -273,17 +267,30 @@ public class ConnectionFactory {
    * as it is thread safe.
    * @param conf configuration
    * @param user the user the asynchronous connection is for
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @throws IOException
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf, User user)
-      throws IOException {
-    Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
-      AsyncConnectionImpl.class, AsyncConnection.class);
-    try {
-      return ReflectionUtils.newInstance(clazz, conf, user);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+  public static CompletableFuture createAsyncConnection(Configuration conf,
+      User user) {
+    CompletableFuture future = new CompletableFuture<>();
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    registry.getClusterId().whenComplete((clusterId, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (clusterId == null) {
+        future.completeExceptionally(new IOException("clusterid came back null"));
+        return;
+      }
+      Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
+        AsyncConnectionImpl.class, AsyncConnection.class);
+      try {
+        future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user));
+      } catch (Exception e) {
+        future.completeExceptionally(e);
+      }
+    });
+    return future;
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index c76aa3ec0a1..47b68e11f99 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@@ -53,8 +54,7 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.zookeeper.data.Stat;
 
 /**
- * Cache the cluster registry data in memory and use zk watcher to update. The only exception is
- * {@link #getClusterId()}, it will fetch the data from zk directly.
+ * Fetch the registry data from zookeeper.
  */
 @InterfaceAudience.Private
 class ZKAsyncRegistry implements AsyncRegistry {
@@ -79,26 +79,6 @@ class ZKAsyncRegistry implements AsyncRegistry {
     this.zk.start();
   }
 
-  @Override
-  public String getClusterId() {
-    try {
-      byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode);
-      if (data == null || data.length == 0) {
-        return null;
-      }
-      data = removeMetaData(data);
-      return ClusterId.parseFrom(data).toString();
-    } catch (Exception e) {
-      LOG.warn("failed to get cluster id", e);
-      return null;
-    }
-  }
-
-  @Override
-  public void close() {
-    zk.close();
-  }
-
   private interface CuratorEventProcessor {
     T process(CuratorEvent event) throws Exception;
   }
@@ -120,6 +100,20 @@ class ZKAsyncRegistry implements AsyncRegistry {
     return future;
   }
 
+  private static String getClusterId(CuratorEvent event) throws DeserializationException {
+    byte[] data = event.getData();
+    if (data == null || data.length == 0) {
+      return null;
+    }
+    data = removeMetaData(data);
+    return ClusterId.parseFrom(data).toString();
+  }
+
+  @Override
+  public CompletableFuture getClusterId() {
+    return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
+  }
+
   private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
       throws IOException {
     byte[] data = event.getData();
@@ -249,4 +243,9 @@ class ZKAsyncRegistry implements AsyncRegistry {
     return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
         .thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
   }
+
+  @Override
+  public void close() {
+    zk.close();
+  }
 }
\ No newline at end of file
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
index 1274dd5205b..389aaaf91d9 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
@@ -68,7 +68,7 @@ public class TestAsyncAggregationClient {
       splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
     }
     UTIL.createTable(TABLE_NAME, CF, splitKeys);
-    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
     TABLE = CONN.getRawTable(TABLE_NAME);
     TABLE.putAll(LongStream.range(0, COUNT)
         .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index 7bab8948c62..73e8f480102 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -61,7 +61,7 @@ public abstract class AbstractTestAsyncTableScan {
     }
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
         .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
             .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
index 462a1d91c7d..467f6c9f5e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
@@ -93,7 +93,7 @@ public class TestAsyncAdmin {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
     TEST_UTIL.startMiniCluster(1);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index fa1346a40b1..e0a32d11fb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -71,7 +71,9 @@ public class TestAsyncNonMetaRegionLocator {
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().setBalancerRunning(false, true);
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 3918dc9db70..fd73ea12017 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -110,7 +110,9 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().setBalancerRunning(false, true);
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
         .toArray(byte[][]::new);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index dfefcc74201..1e79488b230 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -85,7 +85,9 @@ public class TestAsyncRegionLocatorTimeout {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = new AsyncConnectionImpl(conf, User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent());
     LOCATOR = CONN.getLocator();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 7f544495162..fa7ff416189 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -66,7 +66,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.getAdmin().setBalancerRunning(false, true);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent());
   }
 
   @AfterClass
@@ -82,8 +84,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
     TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
       TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
-    RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
-        .setMaxRetries(30).build();
+    RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME)
+        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
 
     // move back
@@ -156,14 +158,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
           void updateCachedLocation(HRegionLocation loc, Throwable exception) {
           }
         };
-    try (AsyncConnectionImpl mockedConn =
-        new AsyncConnectionImpl(CONN.getConfiguration(), User.getCurrent()) {
+    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
+        CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
 
-          @Override
-          AsyncRegionLocator getLocator() {
-            return mockedLocator;
-          }
-        }) {
+      @Override
+      AsyncRegionLocator getLocator() {
+        return mockedLocator;
+      }
+    }) {
       RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME)
           .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index bb6cc2d34bb..593c88ea656 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -99,7 +99,7 @@ public class TestAsyncTable {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 308b9e51a0d..f2a411636ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -103,7 +103,7 @@ public class TestAsyncTableBatch {
     for (int i = 111; i < 999; i += 111) {
       SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
     }
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 6b1a74fb5ba..913c2e978a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -91,7 +91,7 @@ public class TestAsyncTableGetMultiThreaded {
     }
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
         .setMaxRetries(1000).build();
     TABLE.putAll(
@@ -119,8 +119,8 @@ public class TestAsyncTableGetMultiThreaded {
   public void test() throws IOException, InterruptedException, ExecutionException {
     int numThreads = 20;
     AtomicBoolean stop = new AtomicBoolean(false);
-    ExecutorService executor =
-        Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads,
+      Threads.newDaemonThreadFactory("TestAsyncGet-"));
     List> futures = new ArrayList<>();
     IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
       run(stop);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index ea999f90e77..3f7d1434662 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -77,13 +77,14 @@ public class TestAsyncTableNoncedRetry {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) {
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent()) {
 
       @Override
       public NonceGenerator getNonceGenerator() {
         return NONCE_GENERATOR;
       }
-
     };
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
index a70b8d20975..c711f30278d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java
@@ -59,7 +59,7 @@ public class TestAsyncTableScanRenewLease {
       SCANNER_LEASE_TIMEOUT_PERIOD_MS);
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     TABLE = CONN.getRawTable(TABLE_NAME);
     TABLE.putAll(IntStream.range(0, 10).mapToObj(
       i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
index 0f132d1471b..a0ae1507fae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
@@ -53,7 +53,7 @@ public class TestAsyncTableScannerCloseWhileSuspending {
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
     TABLE.putAll(IntStream.range(0, 100).mapToObj(
       i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index 11b9fd40faa..b092a254cc5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -92,7 +92,7 @@ public class TestZKAsyncRegistry {
   @Test
   public void test() throws InterruptedException, ExecutionException, IOException {
     assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getClusterId(),
-      REGISTRY.getClusterId());
+      REGISTRY.getClusterId().get());
     assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getServersSize(),
       REGISTRY.getCurrentNrHRS().get().intValue());
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),