HBASE-26220 Use P2P communicate between region servers to sync the list for bootstrap node (#3697)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-09-29 21:12:58 +08:00 committed by GitHub
parent f000b77532
commit 1152a61b5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 573 additions and 215 deletions

View File

@ -86,11 +86,11 @@ class AsyncConnectionImpl implements AsyncConnection {
final AsyncConnectionConfiguration connConf;
private final User user;
protected final User user;
final ConnectionRegistry registry;
private final int rpcTimeout;
protected final int rpcTimeout;
protected final RpcClient rpcClient;

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// The protos for exchange bootstrap nodes between region servers.
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "BootstrapNodeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
message GetAllBootstrapNodesRequest {
}
message GetAllBootstrapNodesResponse {
repeated ServerName node = 1;
}
service BootstrapNodeService {
rpc GetAllBootstrapNodes(GetAllBootstrapNodesRequest)
returns(GetAllBootstrapNodesResponse);
}

View File

@ -175,6 +175,15 @@ message FileArchiveNotificationRequest {
message FileArchiveNotificationResponse {
}
message GetLiveRegionServersRequest {
required uint32 count = 1;
}
message GetLiveRegionServersResponse {
repeated ServerName server = 1;
required uint32 total = 2;
}
service RegionServerStatusService {
/** Called when a region server first starts. */
rpc RegionServerStartup(RegionServerStartupRequest)
@ -217,4 +226,8 @@ service RegionServerStatusService {
/** Reports files that were moved to the archive directory for space quotas */
rpc ReportFileArchival(FileArchiveNotificationRequest)
returns(FileArchiveNotificationResponse);
/** Get some live region servers to be used as seed for bootstrap nodes */
rpc GetLiveRegionServers(GetLiveRegionServersRequest)
returns(GetLiveRegionServersResponse);
}

View File

@ -301,7 +301,7 @@ public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>>
int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
sample.add(server.getRegionServers());
sample.add(server.getBootstrapNodes());
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
@ -98,4 +99,15 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Clean up after finishing bulk load, no matter success or not.
*/
CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken);
/**
* Get live region servers from masters.
*/
CompletableFuture<List<ServerName>> getLiveRegionServers(MasterAddressTracker masterAddrTracker,
int count);
/**
* Get the bootstrap node list of another region server.
*/
CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
}

View File

@ -20,21 +20,26 @@ package org.apache.hadoop.hbase.client;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@ -43,6 +48,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
/**
* The implementation of AsyncClusterConnection.
@ -132,4 +139,41 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
}, (s, c, req, done) -> s.cleanupBulkLoad(c, req, done), (c, resp) -> null))
.call();
}
@Override
public CompletableFuture<List<ServerName>>
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) {
CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
RegionServerStatusService.Interface stub = RegionServerStatusService
.newStub(rpcClient.createRpcChannel(masterAddrTracker.getMasterAddress(), user, rpcTimeout));
HBaseRpcController controller = rpcControllerFactory.newController();
stub.getLiveRegionServers(controller,
GetLiveRegionServersRequest.newBuilder().setCount(count).build(), resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp.getServerList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList()));
}
});
return future;
}
@Override
public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
BootstrapNodeService.Interface stub =
BootstrapNodeService.newStub(rpcClient.createRpcChannel(regionServer, user, rpcTimeout));
HBaseRpcController controller = rpcControllerFactory.newController();
stub.getAllBootstrapNodes(controller, GetAllBootstrapNodesRequest.getDefaultInstance(),
resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp.getNodeList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toList()));
}
});
return future;
}
}

View File

@ -46,9 +46,9 @@ public interface ConnectionRegistryEndpoint {
List<ServerName> getBackupMasters();
/**
* Get a iterator of the region servers which could be used as bootstrap nodes.
* Get a iterator of the available bootstrap nodes.
*/
Iterator<ServerName> getRegionServers();
Iterator<ServerName> getBootstrapNodes();
/**
* Get the location of meta regions.

View File

@ -4003,7 +4003,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
@Override
public Iterator<ServerName> getRegionServers() {
public Iterator<ServerName> getBootstrapNodes() {
return regionServerTracker.getRegionServers().iterator();
}
@ -4011,4 +4011,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
public List<HRegionLocation> getMetaLocations() {
return metaRegionLocationCache.getMetaRegionLocations();
}
public Collection<ServerName> getLiveRegionServers() {
return regionServerTracker.getRegionServers();
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@ -380,6 +381,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@ -3477,4 +3480,16 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
ExecuteProceduresRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}
@Override
public GetLiveRegionServersResponse getLiveRegionServers(RpcController controller,
GetLiveRegionServersRequest request) throws ServiceException {
List<ServerName> regionServers = new ArrayList<>(server.getLiveRegionServers());
Collections.shuffle(regionServers, ThreadLocalRandom.current());
GetLiveRegionServersResponse.Builder builder =
GetLiveRegionServersResponse.newBuilder().setTotal(regionServers.size());
regionServers.stream().limit(request.getCount()).map(ProtobufUtil::toServerName)
.forEach(builder::addServer);
return builder.build();
}
}

View File

@ -25,6 +25,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
/**
* A wrapper class for MasterRpcServices shortcut that ensures a client version is available
@ -108,4 +110,10 @@ public class MasterRpcServicesVersionWrapper
throws ServiceException {
return masterRpcServices.reportFileArchival(controller, request);
}
@Override
public GetLiveRegionServersResponse getLiveRegionServers(RpcController controller,
GetLiveRegionServersRequest request) throws ServiceException {
return masterRpcServices.getLiveRegionServers(controller, request);
}
}

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseRpcServicesBase;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manage the bootstrap node list at region server side.
* <p/>
* It will request master first to get the initial set of bootstrap nodes(a sub set of live region
* servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
* cases, if the cluster is stable, we do not need to request master again until we reach the
* request master interval. And if the current number of bootstrap nodes is not enough, we will
* request master soon.
* <p/>
* The algorithm is very simple, as we will always fallback to request master. THe trick here is
* that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
* will not put too much pressure on master if we always request master. And for large clusters, we
* will soon get enough bootstrap nodes and stop requesting master.
*/
@InterfaceAudience.Private
public class BootstrapNodeManager {
private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class);
public static final String REQUEST_MASTER_INTERVAL_SECS =
"hbase.server.bootstrap.request_master_interval.secs";
// default request every 10 minutes
public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10);
public static final String REQUEST_MASTER_MIN_INTERVAL_SECS =
"hbase.server.bootstrap.request_master_min_interval.secs";
// default 30 seconds
public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30;
public static final String REQUEST_REGIONSERVER_INTERVAL_SECS =
"hbase.server.bootstrap.request_regionserver_interval.secs";
// default request every 30 seconds
public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30;
private static final float JITTER = 0.2f;
private volatile List<ServerName> nodes = Collections.emptyList();
private final AsyncClusterConnection conn;
private final MasterAddressTracker masterAddrTracker;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
private final long requestMasterIntervalSecs;
private final long requestMasterMinIntervalSecs;
private final long requestRegionServerIntervalSecs;
private final int maxNodeCount;
private final RetryCounterFactory retryCounterFactory;
private RetryCounter retryCounter;
private long lastRequestMasterTime;
public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker) {
this.conn = conn;
this.masterAddrTracker = masterAddrTracker;
Configuration conf = conn.getConfiguration();
requestMasterIntervalSecs =
conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS);
requestMasterMinIntervalSecs =
conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS);
requestRegionServerIntervalSecs =
conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS);
maxNodeCount = conf.getInt(HBaseRpcServicesBase.CLIENT_BOOTSTRAP_NODE_LIMIT,
HBaseRpcServicesBase.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
retryCounterFactory = new RetryCounterFactory(
new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER)
.setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs)
.setTimeUnit(TimeUnit.SECONDS));
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
TimeUnit.SECONDS);
}
private long getDelay(long delay) {
long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER);
return delay + jitterDelay;
}
private void getFromMaster() {
List<ServerName> liveRegionServers;
try {
// get 2 times number of node
liveRegionServers =
FutureUtils.get(conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2));
} catch (IOException e) {
LOG.warn("failed to get live region servers from master", e);
if (retryCounter == null) {
retryCounter = retryCounterFactory.create();
}
executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(),
TimeUnit.SECONDS);
return;
}
retryCounter = null;
lastRequestMasterTime = EnvironmentEdgeManager.currentTime();
this.nodes = Collections.unmodifiableList(liveRegionServers);
if (liveRegionServers.size() < maxNodeCount) {
// If the number of live region servers is small, it means the cluster is small, so requesting
// master with a higher frequency will not be a big problem, so here we will always request
// master to get the live region servers as bootstrap nodes.
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
TimeUnit.SECONDS);
return;
}
// schedule tasks to exchange the bootstrap nodes with other region servers.
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
TimeUnit.SECONDS);
}
// this method is also used to test whether a given region server is still alive.
private void getFromRegionServer() {
if (EnvironmentEdgeManager.currentTime() - lastRequestMasterTime >= TimeUnit.SECONDS
.toMillis(requestMasterIntervalSecs)) {
// schedule a get from master task immediately if haven't request master for more than
// requestMasterIntervalSecs
executor.execute(this::getFromMaster);
return;
}
List<ServerName> currentList = this.nodes;
ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size()));
List<ServerName> otherList;
try {
otherList = FutureUtils.get(conn.getAllBootstrapNodes(peer));
} catch (IOException e) {
LOG.warn("failed to request region server {}", peer, e);
// remove this region server from the list since it can not respond successfully
List<ServerName> newList = currentList.stream().filter(sn -> sn != peer)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
this.nodes = newList;
if (newList.size() < maxNodeCount) {
// schedule a get from master task immediately
executor.execute(this::getFromMaster);
} else {
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
TimeUnit.SECONDS);
}
return;
}
// randomly select new live region server list
Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList);
newRegionServers.addAll(otherList);
List<ServerName> newList = new ArrayList<ServerName>(newRegionServers);
Collections.shuffle(newList, ThreadLocalRandom.current());
int expectedListSize = maxNodeCount * 2;
if (newList.size() <= expectedListSize) {
this.nodes = Collections.unmodifiableList(newList);
} else {
this.nodes =
Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize)));
}
// schedule a new get from region server task
executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS);
}
public void stop() {
executor.shutdownNow();
}
public List<ServerName> getBootstrapNodes() {
return nodes;
}
}

View File

@ -151,7 +151,6 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -352,11 +351,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
// master address tracker
private final MasterAddressTracker masterAddressTracker;
/**
* Cache for all the region servers in the cluster. Used for serving ClientMetaService.
*/
private final RegionServerAddressTracker regionServerAddressTracker;
// Log Splitting Worker
private SplitLogWorker splitLogWorker;
@ -447,6 +441,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
private FileSystemUtilizationChore fsUtilizationChore;
private BootstrapNodeManager bootstrapNodeManager;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
@ -511,7 +507,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
masterAddressTracker = null;
}
this.rpcServices.start(zooKeeper);
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
} catch (Throwable t) {
// Make sure we log the exception. HRegionServer is often started via reflection and the
// cause of failed startup is lost.
@ -644,6 +639,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
try {
initializeZooKeeper();
setupClusterConnection();
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
// Setup RPC client for master communication
this.rpcClient = asyncClusterConnection.getRpcClient();
} catch (Throwable t) {
@ -2277,6 +2273,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
protected void stopServiceThreads() {
// clean up the scheduled chores
stopChoreService();
if (bootstrapNodeManager != null) {
bootstrapNodeManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.join();
}
@ -3430,8 +3429,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
}
@Override
public Iterator<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers().iterator();
public Iterator<ServerName> getBootstrapNodes() {
return bootstrapNodeManager.getBootstrapNodes().iterator();
}
@Override

View File

@ -196,6 +196,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@ -244,7 +247,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
*/
@InterfaceAudience.Private
public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
implements ClientService.BlockingInterface {
implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
@ -3839,4 +3842,13 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
throw new ServiceException(e);
}
}
@Override
public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
GetAllBootstrapNodesRequest request) throws ServiceException {
GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
server.getBootstrapNodes()
.forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
return builder.build();
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@ -157,4 +158,15 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
public Connection toConnection() {
return null;
}
@Override
public CompletableFuture<List<ServerName>>
getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) {
return null;
}
@Override
public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
return null;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -61,6 +62,7 @@ public class TestRpcConnectionRegistry {
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseRpcServicesBase;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestBootstrapNodeManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBootstrapNodeManager.class);
private Configuration conf;
private AsyncClusterConnection conn;
private MasterAddressTracker tracker;
private BootstrapNodeManager manager;
@Before
public void setUp() {
conf = HBaseConfiguration.create();
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_INTERVAL_SECS, 5);
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
conf.setLong(BootstrapNodeManager.REQUEST_REGIONSERVER_INTERVAL_SECS, 1);
conf.setInt(HBaseRpcServicesBase.CLIENT_BOOTSTRAP_NODE_LIMIT, 2);
conn = mock(AsyncClusterConnection.class);
when(conn.getConfiguration()).thenReturn(conf);
tracker = mock(MasterAddressTracker.class);
}
@After
public void tearDown() {
if (manager != null) {
manager.stop();
}
}
private void assertListEquals(List<ServerName> expected, List<ServerName> actual) {
assertEquals(expected.size(), expected.size());
assertThat(actual, hasItems(expected.toArray(new ServerName[0])));
}
@Test
public void testNormal() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(regionServers));
when(conn.getAllBootstrapNodes(any()))
.thenReturn(CompletableFuture.completedFuture(regionServers));
manager = new BootstrapNodeManager(conn, tracker);
Thread.sleep(3000);
verify(conn, times(1)).getLiveRegionServers(any(), anyInt());
verify(conn, atLeastOnce()).getAllBootstrapNodes(any());
assertListEquals(regionServers, manager.getBootstrapNodes());
}
// if we do not return enough region servers, we will always get from master
@Test
public void testOnlyMaster() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(regionServers));
when(conn.getAllBootstrapNodes(any()))
.thenReturn(CompletableFuture.completedFuture(regionServers));
manager = new BootstrapNodeManager(conn, tracker);
Thread.sleep(3000);
verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt());
verify(conn, never()).getAllBootstrapNodes(any());
assertListEquals(regionServers, manager.getBootstrapNodes());
}
@Test
public void testRegionServerError() throws Exception {
List<ServerName> regionServers =
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
List<ServerName> newRegionServers =
Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()),
ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime()));
when(conn.getLiveRegionServers(any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(regionServers));
when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> {
if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) {
return FutureUtils.failedFuture(new IOException("Inject error"));
} else {
return CompletableFuture.completedFuture(regionServers.subList(0, 3));
}
});
manager = new BootstrapNodeManager(conn, tracker);
// we should remove server4 from the list
Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3);
assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes());
when(conn.getLiveRegionServers(any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(newRegionServers));
doAnswer(invocation -> {
String hostname = invocation.getArgument(0, ServerName.class).getHostname();
switch (hostname) {
case "server1":
return CompletableFuture.completedFuture(regionServers.subList(0, 1));
case "server2":
case "server3":
return FutureUtils.failedFuture(new IOException("Inject error"));
default:
return CompletableFuture.completedFuture(newRegionServers);
}
}).when(conn).getAllBootstrapNodes(any());
// we should remove server2, server3 from the list and then get the new list from master again
Waiter.waitFor(conf, 30000, () -> {
List<ServerName> bootstrapNodes = manager.getBootstrapNodes();
if (bootstrapNodes.size() != 2) {
return false;
}
String hostname = bootstrapNodes.get(0).getHostname();
return hostname.equals("server5") || hostname.equals("server6");
});
assertListEquals(newRegionServers, manager.getBootstrapNodes());
}
}

View File

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* Class for tracking the region servers for a cluster.
*/
@InterfaceAudience.Private
public class RegionServerAddressTracker extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
private volatile List<ServerName> regionServers = Collections.emptyList();
private final Abortable abortable;
public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher);
this.abortable = abortable;
watcher.registerListener(this);
loadRegionServerList();
}
private void loadRegionServerList() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
LOG.error("failed to list region servers", e);
abortable.abort("failed to list region servers", e);
return;
}
if (CollectionUtils.isEmpty(names)) {
regionServers = Collections.emptyList();
} else {
regionServers = names.stream().map(ServerName::parseServerName)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode)) {
loadRegionServerList();
}
}
public List<ServerName> getRegionServers() {
return regionServers;
}
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ZKTests.class, MediumTests.class })
public class TestRegionServerAddressTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
private static final HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil();
private ZKWatcher zk;
private RegionServerAddressTracker tracker;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Before
public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
}
@After
public void tearDown() throws IOException {
Closeables.close(zk, true);
}
@Test
public void test() throws KeeperException {
ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs1, tracker.getRegionServers().get(0));
ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs2, tracker.getRegionServers().get(0));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
}
private static final class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e);
}
@Override
public boolean isAborted() {
return false;
}
}
}