HBASE-26306 Backport "HBASE-26220 Use P2P communicate between region servers to sync the list for bootstrap node" to branch-2 (#3727)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-10-10 22:56:12 +08:00 committed by GitHub
parent 1d8570cde1
commit 10584d70d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 538 additions and 220 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
@ -323,4 +324,15 @@ public interface ClusterConnection extends Connection {
* supports cell blocks.
*/
boolean hasCellBlockSupport();
/**
* Get live region servers from masters.
*/
List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, int count)
throws IOException;
/**
* Get the bootstrap node list of another region server.
*/
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;
}

View File

@ -49,6 +49,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.CallQueueTooBigException;
@ -104,6 +106,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Get
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@ -134,6 +139,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@ -2216,4 +2224,37 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new IOException(cause);
}
}
@Override
public List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, int count)
throws IOException {
RegionServerStatusService.BlockingInterface stub = RegionServerStatusService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterAddrTracker.get(), user, rpcTimeout));
GetLiveRegionServersResponse resp;
try {
resp = stub.getLiveRegionServers(null,
GetLiveRegionServersRequest.newBuilder().setCount(count).build());
} catch (ServiceException e) {
Throwable t = ConnectionUtils.translateException(e);
Throwables.propagateIfPossible(t, IOException.class);
throw new IOException(t);
}
return resp.getServerList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList());
}
@Override
public List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException {
BootstrapNodeService.BlockingInterface stub = BootstrapNodeService
.newBlockingStub(rpcClient.createBlockingRpcChannel(regionServer, user, rpcTimeout));
GetAllBootstrapNodesResponse resp;
try {
resp = stub.getAllBootstrapNodes(null, GetAllBootstrapNodesRequest.getDefaultInstance());
} catch (ServiceException e) {
Throwable t = ConnectionUtils.translateException(e);
Throwables.propagateIfPossible(t, IOException.class);
throw new IOException(t);
}
return resp.getNodeList().stream().map(ProtobufUtil::toServerName).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// The protos for exchange bootstrap nodes between region servers.
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "BootstrapNodeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
message GetAllBootstrapNodesRequest {
}
message GetAllBootstrapNodesResponse {
repeated ServerName node = 1;
}
service BootstrapNodeService {
rpc GetAllBootstrapNodes(GetAllBootstrapNodesRequest)
returns(GetAllBootstrapNodesResponse);
}

View File

@ -175,6 +175,15 @@ message FileArchiveNotificationRequest {
message FileArchiveNotificationResponse {
}
message GetLiveRegionServersRequest {
required uint32 count = 1;
}
message GetLiveRegionServersResponse {
repeated ServerName server = 1;
required uint32 total = 2;
}
service RegionServerStatusService {
/** Called when a region server first starts. */
rpc RegionServerStartup(RegionServerStartupRequest)
@ -217,4 +226,8 @@ service RegionServerStatusService {
/** Reports files that were moved to the archive directory for space quotas */
rpc ReportFileArchival(FileArchiveNotificationRequest)
returns(FileArchiveNotificationResponse);
/** Get some live region servers to be used as seed for bootstrap nodes */
rpc GetLiveRegionServers(GetLiveRegionServersRequest)
returns(GetLiveRegionServersResponse);
}

View File

@ -2757,7 +2757,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public Iterator<ServerName> getRegionServers() {
public Iterator<ServerName> getBootstrapNodes() {
return regionServerTracker.getRegionServers().iterator();
}
@ -3959,4 +3959,8 @@ public class HMaster extends HRegionServer implements MasterServices {
MasterRegion getMasterRegion() {
return masterRegion;
}
public Collection<ServerName> getLiveRegionServers() {
return regionServerTracker.getRegionServers();
}
}

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -122,8 +123,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -327,6 +326,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@ -3045,4 +3046,15 @@ public class MasterRpcServices extends RSRpcServices implements
.addAllBalancerRejection(balancerRejections).build();
}
@Override
public GetLiveRegionServersResponse getLiveRegionServers(RpcController controller,
GetLiveRegionServersRequest request) throws ServiceException {
List<ServerName> regionServers = new ArrayList<>(master.getLiveRegionServers());
Collections.shuffle(regionServers, ThreadLocalRandom.current());
GetLiveRegionServersResponse.Builder builder =
GetLiveRegionServersResponse.newBuilder().setTotal(regionServers.size());
regionServers.stream().limit(request.getCount()).map(ProtobufUtil::toServerName)
.forEach(builder::addServer);
return builder.build();
}
}

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manage the bootstrap node list at region server side.
* <p/>
* It will request master first to get the initial set of bootstrap nodes(a sub set of live region
* servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
* cases, if the cluster is stable, we do not need to request master again until we reach the
* request master interval. And if the current number of bootstrap nodes is not enough, we will
* request master soon.
* <p/>
* The algorithm is very simple, as we will always fallback to request master. THe trick here is
* that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
* will not put too much pressure on master if we always request master. And for large clusters, we
* will soon get enough bootstrap nodes and stop requesting master.
*/
@InterfaceAudience.Private
public class BootstrapNodeManager {
private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class);
public static final String REQUEST_MASTER_INTERVAL_SECS =
"hbase.server.bootstrap.request_master_interval.secs";
// default request every 10 minutes
public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10);
public static final String REQUEST_MASTER_MIN_INTERVAL_SECS =
"hbase.server.bootstrap.request_master_min_interval.secs";
// default 30 seconds
public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30;
public static final String REQUEST_REGIONSERVER_INTERVAL_SECS =
"hbase.server.bootstrap.request_regionserver_interval.secs";
// default request every 30 seconds
public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30;
private static final float JITTER = 0.2f;
private volatile List<ServerName> nodes = Collections.emptyList();
private final ClusterConnection conn;
private final MasterAddressTracker masterAddrTracker;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
private final long requestMasterIntervalSecs;
private final long requestMasterMinIntervalSecs;
private final long requestRegionServerIntervalSecs;
private final int maxNodeCount;
private final RetryCounterFactory retryCounterFactory;
private RetryCounter retryCounter;
private long lastRequestMasterTime;
public BootstrapNodeManager(ClusterConnection conn, MasterAddressTracker masterAddrTracker) {
this.conn = conn;
this.masterAddrTracker = masterAddrTracker;
Configuration conf = conn.getConfiguration();
requestMasterIntervalSecs =
conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS);
requestMasterMinIntervalSecs =
conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS);
requestRegionServerIntervalSecs =
conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS);
maxNodeCount = conf.getInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT,
RSRpcServices.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
retryCounterFactory = new RetryCounterFactory(
new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER)
.setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs)
.setTimeUnit(TimeUnit.SECONDS));
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
TimeUnit.SECONDS);
}
private long getDelay(long delay) {
long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER);
return delay + jitterDelay;
}
private void getFromMaster() {
List<ServerName> liveRegionServers;
try {
// get 2 times number of node
liveRegionServers =
conn.getLiveRegionServers(() -> masterAddrTracker.getMasterAddress(), maxNodeCount * 2);
} catch (IOException e) {
LOG.warn("failed to get live region servers from master", e);
if (retryCounter == null) {
retryCounter = retryCounterFactory.create();
}
executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(),
TimeUnit.SECONDS);
return;
}
retryCounter = null;
lastRequestMasterTime = EnvironmentEdgeManager.currentTime();
this.nodes = Collections.unmodifiableList(liveRegionServers);
if (liveRegionServers.size() < maxNodeCount) {
// If the number of live region servers is small, it means the cluster is small, so requesting
// master with a higher frequency will not be a big problem, so here we will always request
// master to get the live region servers as bootstrap nodes.
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
TimeUnit.SECONDS);
return;
}
// schedule tasks to exchange the bootstrap nodes with other region servers.
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
TimeUnit.SECONDS);
}
// this method is also used to test whether a given region server is still alive.
private void getFromRegionServer() {
if (EnvironmentEdgeManager.currentTime() - lastRequestMasterTime >= TimeUnit.SECONDS
.toMillis(requestMasterIntervalSecs)) {
// schedule a get from master task immediately if haven't request master for more than
// requestMasterIntervalSecs
executor.execute(this::getFromMaster);
return;
}
List<ServerName> currentList = this.nodes;
ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size()));
List<ServerName> otherList;
try {
otherList = conn.getAllBootstrapNodes(peer);
} catch (IOException e) {
LOG.warn("failed to request region server {}", peer, e);
// remove this region server from the list since it can not respond successfully
List<ServerName> newList = currentList.stream().filter(sn -> sn != peer)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
this.nodes = newList;
if (newList.size() < maxNodeCount) {
// schedule a get from master task immediately
executor.execute(this::getFromMaster);
} else {
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
TimeUnit.SECONDS);
}
return;
}
// randomly select new live region server list
Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList);
newRegionServers.addAll(otherList);
List<ServerName> newList = new ArrayList<ServerName>(newRegionServers);
Collections.shuffle(newList, ThreadLocalRandom.current());
int expectedListSize = maxNodeCount * 2;
if (newList.size() <= expectedListSize) {
this.nodes = Collections.unmodifiableList(newList);
} else {
this.nodes =
Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize)));
}
// schedule a new get from region server task
executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS);
}
public void stop() {
executor.shutdownNow();
}
public List<ServerName> getBootstrapNodes() {
return nodes;
}
}

View File

@ -181,7 +181,6 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -432,10 +431,6 @@ public class HRegionServer extends Thread implements
* entries. Used for serving ClientMetaService.
*/
private final MetaRegionLocationCache metaRegionLocationCache;
/**
* Cache for all the region servers in the cluster. Used for serving ClientMetaService.
*/
private final RegionServerAddressTracker regionServerAddressTracker;
// Cluster Status Tracker
protected final ClusterStatusTracker clusterStatusTracker;
@ -567,6 +562,8 @@ public class HRegionServer extends Thread implements
*/
private NamedQueueRecorder namedQueueRecorder = null;
private BootstrapNodeManager bootstrapNodeManager;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
@ -688,12 +685,6 @@ public class HRegionServer extends Thread implements
}
this.rpcServices.start(zooKeeper);
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
if (!(this instanceof HMaster)) {
// do not create this field for HMaster, we have another region server tracker for HMaster.
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
} else {
this.regionServerAddressTracker = null;
}
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
@ -923,6 +914,9 @@ public class HRegionServer extends Thread implements
try {
initializeZooKeeper();
setupClusterConnection();
if (!(this instanceof HMaster)) {
bootstrapNodeManager = new BootstrapNodeManager(clusterConnection, masterAddressTracker);
}
// Setup RPC client for master communication
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
@ -2683,7 +2677,9 @@ public class HRegionServer extends Thread implements
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
choreService.shutdown();
}
if (bootstrapNodeManager != null) {
bootstrapNodeManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.join();
}
@ -3968,8 +3964,8 @@ public class HRegionServer extends Thread implements
return masterAddressTracker.getBackupMasters();
}
public Iterator<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers().iterator();
public Iterator<ServerName> getBootstrapNodes() {
return bootstrapNodeManager.getBootstrapNodes().iterator();
}
public MetaRegionLocationCache getMetaRegionLocationCache() {

View File

@ -218,6 +218,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@ -280,9 +283,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface,
ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface,
ClientService.BlockingInterface, ClientMetaService.BlockingInterface,
BootstrapNodeService.BlockingInterface, PriorityFunction, ConfigurationObserver {
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
/** RPC scheduler to use for the region server. */
@ -4076,11 +4079,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
sample.add(regionServer.getRegionServers());
sample.add(regionServer.getBootstrapNodes());
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
.forEach(builder::addServerName);
return builder.build();
}
@Override
public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
GetAllBootstrapNodesRequest request) throws ServiceException {
GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
regionServer.getBootstrapNodes()
.forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
return builder.build();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -61,6 +62,7 @@ public class TestRpcConnectionRegistry {
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
UTIL.startMiniCluster(3);
HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestBootstrapNodeManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBootstrapNodeManager.class);
private Configuration conf;
private ClusterConnection conn;
private MasterAddressTracker tracker;
private BootstrapNodeManager manager;
@Before
public void setUp() {
conf = HBaseConfiguration.create();
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_INTERVAL_SECS, 5);
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
conf.setLong(BootstrapNodeManager.REQUEST_REGIONSERVER_INTERVAL_SECS, 1);
conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, 2);
conn = mock(ClusterConnection.class);
when(conn.getConfiguration()).thenReturn(conf);
tracker = mock(MasterAddressTracker.class);
}
@After
public void tearDown() {
if (manager != null) {
manager.stop();
}
}
private void assertListEquals(List<ServerName> expected, List<ServerName> actual) {
assertEquals(expected.size(), expected.size());
assertThat(actual, hasItems(expected.toArray(new ServerName[0])));
}
@Test
public void testNormal() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers);
manager = new BootstrapNodeManager(conn, tracker);
Thread.sleep(3000);
verify(conn, times(1)).getLiveRegionServers(any(), anyInt());
verify(conn, atLeastOnce()).getAllBootstrapNodes(any());
assertListEquals(regionServers, manager.getBootstrapNodes());
}
// if we do not return enough region servers, we will always get from master
@Test
public void testOnlyMaster() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers);
manager = new BootstrapNodeManager(conn, tracker);
Thread.sleep(3000);
verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt());
verify(conn, never()).getAllBootstrapNodes(any());
assertListEquals(regionServers, manager.getBootstrapNodes());
}
@Test
public void testRegionServerError() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
List<ServerName> newRegionServers =
Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> {
if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) {
throw new IOException("Inject error");
} else {
return regionServers.subList(0, 3);
}
});
manager = new BootstrapNodeManager(conn, tracker);
// we should remove server4 from the list
Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3);
assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes());
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(newRegionServers);
doAnswer(invocation -> {
String hostname = invocation.getArgument(0, ServerName.class).getHostname();
switch (hostname) {
case "server1":
return regionServers.subList(0, 1);
case "server2":
case "server3":
throw new IOException("Inject error");
default:
return newRegionServers;
}
}).when(conn).getAllBootstrapNodes(any());
// we should remove server2, server3 from the list and then get the new list from master again
Waiter.waitFor(conf, 30000, () -> {
List<ServerName> bootstrapNodes = manager.getBootstrapNodes();
if (bootstrapNodes.size() != 2) {
return false;
}
String hostname = bootstrapNodes.get(0).getHostname();
return hostname.equals("server5") || hostname.equals("server6");
});
assertListEquals(newRegionServers, manager.getBootstrapNodes());
}
}

View File

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* Class for tracking the region servers for a cluster.
*/
@InterfaceAudience.Private
public class RegionServerAddressTracker extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
private volatile List<ServerName> regionServers = Collections.emptyList();
private final Abortable abortable;
public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher);
this.abortable = abortable;
watcher.registerListener(this);
loadRegionServerList();
}
private void loadRegionServerList() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
LOG.error("failed to list region servers", e);
abortable.abort("failed to list region servers", e);
return;
}
if (CollectionUtils.isEmpty(names)) {
regionServers = Collections.emptyList();
} else {
regionServers = names.stream().map(ServerName::parseServerName)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode)) {
loadRegionServerList();
}
}
public List<ServerName> getRegionServers() {
return regionServers;
}
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ZKTests.class, MediumTests.class })
public class TestRegionServerAddressTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility();
private ZKWatcher zk;
private RegionServerAddressTracker tracker;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Before
public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
}
@After
public void tearDown() throws IOException {
Closeables.close(zk, true);
}
@Test
public void test() throws KeeperException {
ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs1, tracker.getRegionServers().get(0));
ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs2, tracker.getRegionServers().get(0));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
}
private static final class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e);
}
@Override
public boolean isAborted() {
return false;
}
}
}