diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 18ebbb0b0d2..fef193cac27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -89,11 +89,13 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; @@ -243,6 +245,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * @param conf Configuration object */ ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { + this(conf, pool, user, null); + } + + /** + * Constructor, for creating cluster connection with provided ConnectionRegistry. + */ + ConnectionImplementation(Configuration conf, ExecutorService pool, User user, + ConnectionRegistry registry) throws IOException { this.conf = conf; this.user = user; if (user != null && user.isLoginFromKeytab()) { @@ -306,7 +316,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.conf.get(BufferedMutator.CLASSNAME_KEY); try { - this.registry = ConnectionRegistryFactory.getRegistry(conf); + if (registry == null) { + this.registry = ConnectionRegistryFactory.getRegistry(conf); + } else { + this.registry = registry; + } retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); @@ -348,7 +362,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> { int numOfReplicas = 1; try { - RegionLocations metaLocations = registry.getMetaRegionLocations().get( + RegionLocations metaLocations = this.registry.getMetaRegionLocations().get( connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS); numOfReplicas = metaLocations.size(); } catch (Exception e) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 3d97a577fcb..8c305b6d2ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,8 +24,6 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; @@ -50,6 +48,18 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -57,16 +67,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.DNS; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hbase.thirdparty.io.netty.util.Timer; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Utility used by client connections. @@ -141,10 +141,10 @@ public final class ConnectionUtils { private final AdminService.BlockingInterface localHostAdmin; private final ClientService.BlockingInterface localHostClient; - private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user, + private ShortCircuitingClusterConnection(Configuration conf, User user, ServerName serverName, AdminService.BlockingInterface admin, - ClientService.BlockingInterface client) throws IOException { - super(conf, pool, user); + ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException { + super(conf, null, user, registry); this.serverName = serverName; this.localHostAdmin = admin; this.localHostClient = client; @@ -174,22 +174,21 @@ public final class ConnectionUtils { * Creates a short-circuit connection that can bypass the RPC layer (serialization, * deserialization, networking, etc..) when talking to a local server. * @param conf the current configuration - * @param pool the thread pool to use for batch operations * @param user the user the connection is for * @param serverName the local server name * @param admin the admin interface of the local server * @param client the client interface of the local server + * @param registry the connection registry to be used, can be null * @return an short-circuit connection. * @throws IOException if IO failure occurred */ - public static ClusterConnection createShortCircuitConnection(final Configuration conf, - ExecutorService pool, User user, final ServerName serverName, - final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) - throws IOException { + public static ClusterConnection createShortCircuitConnection(final Configuration conf, User user, + final ServerName serverName, final AdminService.BlockingInterface admin, + final ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException { if (user == null) { user = UserProvider.instantiate(conf).getCurrent(); } - return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client); + return new ShortCircuitingClusterConnection(conf, user, serverName, admin, client, registry); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java new file mode 100644 index 00000000000..cdfbb6d925f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Connection registry implementation for region server. + */ +@InterfaceAudience.Private +public class RegionServerRegistry implements ConnectionRegistry { + + private final HRegionServer regionServer; + + public RegionServerRegistry(HRegionServer regionServer) { + this.regionServer = regionServer; + } + + @Override + public CompletableFuture getMetaRegionLocations() { + CompletableFuture future = new CompletableFuture<>(); + Optional> locs = + regionServer.getMetaRegionLocationCache().getMetaRegionLocations(); + if (locs.isPresent()) { + List list = locs.get(); + if (list.isEmpty()) { + future.completeExceptionally(new IOException("no meta location available")); + } else { + future.complete(new RegionLocations(list)); + } + } else { + future.completeExceptionally(new IOException("no meta location available")); + } + return future; + } + + @Override + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture(regionServer.getClusterId()); + } + + @Override + public CompletableFuture getActiveMaster() { + CompletableFuture future = new CompletableFuture<>(); + Optional activeMaster = regionServer.getActiveMaster(); + if (activeMaster.isPresent()) { + future.complete(activeMaster.get()); + } else { + future.completeExceptionally(new IOException("no active master available")); + } + return future; + } + + @Override + public void close() { + // nothing + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1221374d81d..56e21913d4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RegionServerRegistry; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; @@ -876,27 +877,11 @@ public class HRegionServer extends Thread implements } protected ClusterConnection createClusterConnection() throws IOException { - Configuration conf = this.conf; - // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons: - // - Decouples RS and master life cycles. RegionServers can continue be up independent of - // masters' availability. - // - Configuration management for region servers (cluster internal) is much simpler when adding - // new masters or removing existing masters, since only clients' config needs to be updated. - // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for - // other internal connections too. - conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, - HConstants.ZK_CONNECTION_REGISTRY_CLASS); - if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { - // Use server ZK cluster for server-issued connections, so we clone - // the conf and unset the client ZK related properties - conf = new Configuration(this.conf); - conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); - } // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. - return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), - serverName, rpcServices, rpcServices); + return ConnectionUtils.createShortCircuitConnection(conf, userProvider.getCurrent(), + serverName, rpcServices, rpcServices, new RegionServerRegistry(this)); } /** @@ -922,7 +907,6 @@ public class HRegionServer extends Thread implements /** * Setup our cluster connection if not already initialized. - * @throws IOException */ protected synchronized void setupClusterConnection() throws IOException { if (clusterConnection == null) { @@ -3863,8 +3847,8 @@ public class HRegionServer extends Thread implements @Override public Connection createConnection(Configuration conf) throws IOException { User user = UserProvider.instantiate(conf).getCurrent(); - return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, - this.rpcServices, this.rpcServices); + return ConnectionUtils.createShortCircuitConnection(conf, user, this.serverName, + this.rpcServices, this.rpcServices, new RegionServerRegistry(this)); } void executeProcedure(long procId, RSProcedureCallable callable) {