HBASE-26294 Backport "HBASE-26181 Region server and master could use itself as ConnectionRegistry" to branch-2 (#3708)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-10-05 22:11:01 +08:00 committed by GitHub
parent 9a33e234e7
commit 706082d513
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 44 deletions

View File

@ -89,11 +89,13 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
@ -243,6 +245,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* @param conf Configuration object * @param conf Configuration object
*/ */
ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
this(conf, pool, user, null);
}
/**
* Constructor, for creating cluster connection with provided ConnectionRegistry.
*/
ConnectionImplementation(Configuration conf, ExecutorService pool, User user,
ConnectionRegistry registry) throws IOException {
this.conf = conf; this.conf = conf;
this.user = user; this.user = user;
if (user != null && user.isLoginFromKeytab()) { if (user != null && user.isLoginFromKeytab()) {
@ -306,7 +316,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.conf.get(BufferedMutator.CLASSNAME_KEY); this.conf.get(BufferedMutator.CLASSNAME_KEY);
try { try {
if (registry == null) {
this.registry = ConnectionRegistryFactory.getRegistry(conf); this.registry = ConnectionRegistryFactory.getRegistry(conf);
} else {
this.registry = registry;
}
retrieveClusterId(); retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
@ -348,7 +362,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> { replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> {
int numOfReplicas = 1; int numOfReplicas = 1;
try { try {
RegionLocations metaLocations = registry.getMetaRegionLocations().get( RegionLocations metaLocations = this.registry.getMetaRegionLocations().get(
connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS); connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
numOfReplicas = metaLocations.size(); numOfReplicas = metaLocations.size();
} catch (Exception e) { } catch (Exception e) {

View File

@ -24,8 +24,6 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -50,6 +48,18 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -57,16 +67,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Utility used by client connections. * Utility used by client connections.
@ -141,10 +141,10 @@ public final class ConnectionUtils {
private final AdminService.BlockingInterface localHostAdmin; private final AdminService.BlockingInterface localHostAdmin;
private final ClientService.BlockingInterface localHostClient; private final ClientService.BlockingInterface localHostClient;
private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user, private ShortCircuitingClusterConnection(Configuration conf, User user,
ServerName serverName, AdminService.BlockingInterface admin, ServerName serverName, AdminService.BlockingInterface admin,
ClientService.BlockingInterface client) throws IOException { ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException {
super(conf, pool, user); super(conf, null, user, registry);
this.serverName = serverName; this.serverName = serverName;
this.localHostAdmin = admin; this.localHostAdmin = admin;
this.localHostClient = client; this.localHostClient = client;
@ -174,22 +174,21 @@ public final class ConnectionUtils {
* Creates a short-circuit connection that can bypass the RPC layer (serialization, * Creates a short-circuit connection that can bypass the RPC layer (serialization,
* deserialization, networking, etc..) when talking to a local server. * deserialization, networking, etc..) when talking to a local server.
* @param conf the current configuration * @param conf the current configuration
* @param pool the thread pool to use for batch operations
* @param user the user the connection is for * @param user the user the connection is for
* @param serverName the local server name * @param serverName the local server name
* @param admin the admin interface of the local server * @param admin the admin interface of the local server
* @param client the client interface of the local server * @param client the client interface of the local server
* @param registry the connection registry to be used, can be null
* @return an short-circuit connection. * @return an short-circuit connection.
* @throws IOException if IO failure occurred * @throws IOException if IO failure occurred
*/ */
public static ClusterConnection createShortCircuitConnection(final Configuration conf, public static ClusterConnection createShortCircuitConnection(final Configuration conf, User user,
ExecutorService pool, User user, final ServerName serverName, final ServerName serverName, final AdminService.BlockingInterface admin,
final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) final ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException {
throws IOException {
if (user == null) { if (user == null) {
user = UserProvider.instantiate(conf).getCurrent(); user = UserProvider.instantiate(conf).getCurrent();
} }
return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client); return new ShortCircuitingClusterConnection(conf, user, serverName, admin, client, registry);
} }
/** /**

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Connection registry implementation for region server.
*/
@InterfaceAudience.Private
public class RegionServerRegistry implements ConnectionRegistry {
private final HRegionServer regionServer;
public RegionServerRegistry(HRegionServer regionServer) {
this.regionServer = regionServer;
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
Optional<List<HRegionLocation>> locs =
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
if (locs.isPresent()) {
List<HRegionLocation> list = locs.get();
if (list.isEmpty()) {
future.completeExceptionally(new IOException("no meta location available"));
} else {
future.complete(new RegionLocations(list));
}
} else {
future.completeExceptionally(new IOException("no meta location available"));
}
return future;
}
@Override
public CompletableFuture<String> getClusterId() {
return CompletableFuture.completedFuture(regionServer.getClusterId());
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
Optional<ServerName> activeMaster = regionServer.getActiveMaster();
if (activeMaster.isPresent()) {
future.complete(activeMaster.get());
} else {
future.completeExceptionally(new IOException("no active master available"));
}
return future;
}
@Override
public void close() {
// nothing
}
}

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionServerRegistry;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.client.locking.LockServiceClient;
@ -876,27 +877,11 @@ public class HRegionServer extends Thread implements
} }
protected ClusterConnection createClusterConnection() throws IOException { protected ClusterConnection createClusterConnection() throws IOException {
Configuration conf = this.conf;
// We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
// - Decouples RS and master life cycles. RegionServers can continue be up independent of
// masters' availability.
// - Configuration management for region servers (cluster internal) is much simpler when adding
// new masters or removing existing masters, since only clients' config needs to be updated.
// - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
// other internal connections too.
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone
// the conf and unset the client ZK related properties
conf = new Configuration(this.conf);
conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
}
// Create a cluster connection that when appropriate, can short-circuit and go directly to the // Create a cluster connection that when appropriate, can short-circuit and go directly to the
// local server if the request is to the local server bypassing RPC. Can be used for both local // local server if the request is to the local server bypassing RPC. Can be used for both local
// and remote invocations. // and remote invocations.
return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), return ConnectionUtils.createShortCircuitConnection(conf, userProvider.getCurrent(),
serverName, rpcServices, rpcServices); serverName, rpcServices, rpcServices, new RegionServerRegistry(this));
} }
/** /**
@ -922,7 +907,6 @@ public class HRegionServer extends Thread implements
/** /**
* Setup our cluster connection if not already initialized. * Setup our cluster connection if not already initialized.
* @throws IOException
*/ */
protected synchronized void setupClusterConnection() throws IOException { protected synchronized void setupClusterConnection() throws IOException {
if (clusterConnection == null) { if (clusterConnection == null) {
@ -3863,8 +3847,8 @@ public class HRegionServer extends Thread implements
@Override @Override
public Connection createConnection(Configuration conf) throws IOException { public Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent(); User user = UserProvider.instantiate(conf).getCurrent();
return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, return ConnectionUtils.createShortCircuitConnection(conf, user, this.serverName,
this.rpcServices, this.rpcServices); this.rpcServices, this.rpcServices, new RegionServerRegistry(this));
} }
void executeProcedure(long procId, RSProcedureCallable callable) { void executeProcedure(long procId, RSProcedureCallable callable) {