diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 9968404de2c..760225967b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.List; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MasterNotRunningException; @@ -323,4 +324,15 @@ public interface ClusterConnection extends Connection { * supports cell blocks. */ boolean hasCellBlockSupport(); + + /** + * Get live region servers from masters. + */ + List getLiveRegionServers(Supplier masterAddrTracker, int count) + throws IOException; + + /** + * Get the bootstrap node list of another region server. + */ + List getAllBootstrapNodes(ServerName regionServer) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index fef193cac27..fcb2eb4f300 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -49,6 +49,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.CallQueueTooBigException; @@ -104,6 +106,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Get import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -134,6 +139,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -2216,4 +2224,37 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new IOException(cause); } } + + @Override + public List getLiveRegionServers(Supplier masterAddrTracker, int count) + throws IOException { + RegionServerStatusService.BlockingInterface stub = RegionServerStatusService.newBlockingStub( + rpcClient.createBlockingRpcChannel(masterAddrTracker.get(), user, rpcTimeout)); + GetLiveRegionServersResponse resp; + try { + resp = stub.getLiveRegionServers(null, + GetLiveRegionServersRequest.newBuilder().setCount(count).build()); + } catch (ServiceException e) { + Throwable t = ConnectionUtils.translateException(e); + Throwables.propagateIfPossible(t, IOException.class); + throw new IOException(t); + } + return resp.getServerList().stream().map(ProtobufUtil::toServerName) + .collect(Collectors.toList()); + } + + @Override + public List getAllBootstrapNodes(ServerName regionServer) throws IOException { + BootstrapNodeService.BlockingInterface stub = BootstrapNodeService + .newBlockingStub(rpcClient.createBlockingRpcChannel(regionServer, user, rpcTimeout)); + GetAllBootstrapNodesResponse resp; + try { + resp = stub.getAllBootstrapNodes(null, GetAllBootstrapNodesRequest.getDefaultInstance()); + } catch (ServiceException e) { + Throwable t = ConnectionUtils.translateException(e); + Throwables.propagateIfPossible(t, IOException.class); + throw new IOException(t); + } + return resp.getNodeList().stream().map(ProtobufUtil::toServerName).collect(Collectors.toList()); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/BootstrapNode.proto b/hbase-protocol-shaded/src/main/protobuf/BootstrapNode.proto new file mode 100644 index 00000000000..a0c3a2fa660 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/BootstrapNode.proto @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +// The protos for exchange bootstrap nodes between region servers. +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "BootstrapNodeProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; + +message GetAllBootstrapNodesRequest { +} + +message GetAllBootstrapNodesResponse { + repeated ServerName node = 1; +} + +service BootstrapNodeService { + rpc GetAllBootstrapNodes(GetAllBootstrapNodesRequest) + returns(GetAllBootstrapNodesResponse); +} \ No newline at end of file diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto index 0137cb1608e..6aed5b46718 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto @@ -175,6 +175,15 @@ message FileArchiveNotificationRequest { message FileArchiveNotificationResponse { } +message GetLiveRegionServersRequest { + required uint32 count = 1; +} + +message GetLiveRegionServersResponse { + repeated ServerName server = 1; + required uint32 total = 2; +} + service RegionServerStatusService { /** Called when a region server first starts. */ rpc RegionServerStartup(RegionServerStartupRequest) @@ -217,4 +226,8 @@ service RegionServerStatusService { /** Reports files that were moved to the archive directory for space quotas */ rpc ReportFileArchival(FileArchiveNotificationRequest) returns(FileArchiveNotificationResponse); + + /** Get some live region servers to be used as seed for bootstrap nodes */ + rpc GetLiveRegionServers(GetLiveRegionServersRequest) + returns(GetLiveRegionServersResponse); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e14006c9420..cbce761eca1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -2757,7 +2757,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public Iterator getRegionServers() { + public Iterator getBootstrapNodes() { return regionServerTracker.getRegionServers().iterator(); } @@ -3959,4 +3959,8 @@ public class HMaster extends HRegionServer implements MasterServices { MasterRegion getMasterRegion() { return masterRegion; } + + public Collection getLiveRegionServers() { + return regionServerTracker.getRegionServers(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index dd48f8ac18c..a64c990021a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -122,8 +123,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -327,6 +326,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; @@ -3045,4 +3046,15 @@ public class MasterRpcServices extends RSRpcServices implements .addAllBalancerRejection(balancerRejections).build(); } + @Override + public GetLiveRegionServersResponse getLiveRegionServers(RpcController controller, + GetLiveRegionServersRequest request) throws ServiceException { + List regionServers = new ArrayList<>(master.getLiveRegionServers()); + Collections.shuffle(regionServers, ThreadLocalRandom.current()); + GetLiveRegionServersResponse.Builder builder = + GetLiveRegionServersResponse.newBuilder().setTotal(regionServers.size()); + regionServers.stream().limit(request.getCount()).map(ProtobufUtil::toServerName) + .forEach(builder::addServer); + return builder.build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java new file mode 100644 index 00000000000..856faed966f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapNodeManager.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; +import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Manage the bootstrap node list at region server side. + *

+ * It will request master first to get the initial set of bootstrap nodes(a sub set of live region + * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most + * cases, if the cluster is stable, we do not need to request master again until we reach the + * request master interval. And if the current number of bootstrap nodes is not enough, we will + * request master soon. + *

+ * The algorithm is very simple, as we will always fallback to request master. THe trick here is + * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it + * will not put too much pressure on master if we always request master. And for large clusters, we + * will soon get enough bootstrap nodes and stop requesting master. + */ +@InterfaceAudience.Private +public class BootstrapNodeManager { + + private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class); + + public static final String REQUEST_MASTER_INTERVAL_SECS = + "hbase.server.bootstrap.request_master_interval.secs"; + + // default request every 10 minutes + public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10); + + public static final String REQUEST_MASTER_MIN_INTERVAL_SECS = + "hbase.server.bootstrap.request_master_min_interval.secs"; + + // default 30 seconds + public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30; + + public static final String REQUEST_REGIONSERVER_INTERVAL_SECS = + "hbase.server.bootstrap.request_regionserver_interval.secs"; + + // default request every 30 seconds + public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30; + + private static final float JITTER = 0.2f; + + private volatile List nodes = Collections.emptyList(); + + private final ClusterConnection conn; + + private final MasterAddressTracker masterAddrTracker; + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build()); + + private final long requestMasterIntervalSecs; + + private final long requestMasterMinIntervalSecs; + + private final long requestRegionServerIntervalSecs; + + private final int maxNodeCount; + + private final RetryCounterFactory retryCounterFactory; + + private RetryCounter retryCounter; + + private long lastRequestMasterTime; + + public BootstrapNodeManager(ClusterConnection conn, MasterAddressTracker masterAddrTracker) { + this.conn = conn; + this.masterAddrTracker = masterAddrTracker; + Configuration conf = conn.getConfiguration(); + requestMasterIntervalSecs = + conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS); + requestMasterMinIntervalSecs = + conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS); + requestRegionServerIntervalSecs = + conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS); + maxNodeCount = conf.getInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, + RSRpcServices.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); + retryCounterFactory = new RetryCounterFactory( + new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER) + .setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs) + .setTimeUnit(TimeUnit.SECONDS)); + executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), + TimeUnit.SECONDS); + } + + private long getDelay(long delay) { + long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER); + return delay + jitterDelay; + } + + private void getFromMaster() { + List liveRegionServers; + try { + // get 2 times number of node + liveRegionServers = + conn.getLiveRegionServers(() -> masterAddrTracker.getMasterAddress(), maxNodeCount * 2); + } catch (IOException e) { + LOG.warn("failed to get live region servers from master", e); + if (retryCounter == null) { + retryCounter = retryCounterFactory.create(); + } + executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(), + TimeUnit.SECONDS); + return; + } + retryCounter = null; + lastRequestMasterTime = EnvironmentEdgeManager.currentTime(); + this.nodes = Collections.unmodifiableList(liveRegionServers); + if (liveRegionServers.size() < maxNodeCount) { + // If the number of live region servers is small, it means the cluster is small, so requesting + // master with a higher frequency will not be a big problem, so here we will always request + // master to get the live region servers as bootstrap nodes. + executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), + TimeUnit.SECONDS); + return; + } + // schedule tasks to exchange the bootstrap nodes with other region servers. + executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), + TimeUnit.SECONDS); + } + + // this method is also used to test whether a given region server is still alive. + private void getFromRegionServer() { + if (EnvironmentEdgeManager.currentTime() - lastRequestMasterTime >= TimeUnit.SECONDS + .toMillis(requestMasterIntervalSecs)) { + // schedule a get from master task immediately if haven't request master for more than + // requestMasterIntervalSecs + executor.execute(this::getFromMaster); + return; + } + List currentList = this.nodes; + ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size())); + List otherList; + try { + otherList = conn.getAllBootstrapNodes(peer); + } catch (IOException e) { + LOG.warn("failed to request region server {}", peer, e); + // remove this region server from the list since it can not respond successfully + List newList = currentList.stream().filter(sn -> sn != peer) + .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); + this.nodes = newList; + if (newList.size() < maxNodeCount) { + // schedule a get from master task immediately + executor.execute(this::getFromMaster); + } else { + executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), + TimeUnit.SECONDS); + } + return; + } + // randomly select new live region server list + Set newRegionServers = new HashSet(currentList); + newRegionServers.addAll(otherList); + List newList = new ArrayList(newRegionServers); + Collections.shuffle(newList, ThreadLocalRandom.current()); + int expectedListSize = maxNodeCount * 2; + if (newList.size() <= expectedListSize) { + this.nodes = Collections.unmodifiableList(newList); + } else { + this.nodes = + Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize))); + } + // schedule a new get from region server task + executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS); + } + + public void stop() { + executor.shutdownNow(); + } + + public List getBootstrapNodes() { + return nodes; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 56e21913d4f..0e77bfbf5c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -181,7 +181,6 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; -import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -432,10 +431,6 @@ public class HRegionServer extends Thread implements * entries. Used for serving ClientMetaService. */ private final MetaRegionLocationCache metaRegionLocationCache; - /** - * Cache for all the region servers in the cluster. Used for serving ClientMetaService. - */ - private final RegionServerAddressTracker regionServerAddressTracker; // Cluster Status Tracker protected final ClusterStatusTracker clusterStatusTracker; @@ -567,6 +562,8 @@ public class HRegionServer extends Thread implements */ private NamedQueueRecorder namedQueueRecorder = null; + private BootstrapNodeManager bootstrapNodeManager; + /** * True if this RegionServer is coming up in a cluster where there is no Master; * means it needs to just come up and make do without a Master to talk to: e.g. in test or @@ -688,12 +685,6 @@ public class HRegionServer extends Thread implements } this.rpcServices.start(zooKeeper); this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); - if (!(this instanceof HMaster)) { - // do not create this field for HMaster, we have another region server tracker for HMaster. - this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this); - } else { - this.regionServerAddressTracker = null; - } // This violates 'no starting stuff in Constructor' but Master depends on the below chore // and executor being created and takes a different startup route. Lots of overlap between HRS // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super @@ -923,6 +914,9 @@ public class HRegionServer extends Thread implements try { initializeZooKeeper(); setupClusterConnection(); + if (!(this instanceof HMaster)) { + bootstrapNodeManager = new BootstrapNodeManager(clusterConnection, masterAddressTracker); + } // Setup RPC client for master communication this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); @@ -2683,7 +2677,9 @@ public class HRegionServer extends Thread implements // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any choreService.shutdown(); } - + if (bootstrapNodeManager != null) { + bootstrapNodeManager.stop(); + } if (this.cacheFlusher != null) { this.cacheFlusher.join(); } @@ -3968,8 +3964,8 @@ public class HRegionServer extends Thread implements return masterAddressTracker.getBackupMasters(); } - public Iterator getRegionServers() { - return regionServerAddressTracker.getRegionServers().iterator(); + public Iterator getBootstrapNodes() { + return bootstrapNodeManager.getBootstrapNodes().iterator(); } public MetaRegionLocationCache getMetaRegionLocationCache() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 18a8c527bca..5f99d6cc6af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -218,6 +218,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -280,9 +283,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe */ @InterfaceAudience.Private @SuppressWarnings("deprecation") -public class RSRpcServices implements HBaseRPCErrorHandler, - AdminService.BlockingInterface, ClientService.BlockingInterface, - ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver { +public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, + ClientService.BlockingInterface, ClientMetaService.BlockingInterface, + BootstrapNodeService.BlockingInterface, PriorityFunction, ConfigurationObserver { protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ @@ -4076,11 +4079,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); ReservoirSample sample = new ReservoirSample<>(maxNodeCount); - sample.add(regionServer.getRegionServers()); + sample.add(regionServer.getBootstrapNodes()); GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); sample.getSamplingResult().stream().map(ProtobufUtil::toServerName) .forEach(builder::addServerName); return builder.build(); } + + @Override + public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, + GetAllBootstrapNodesRequest request) throws ServiceException { + GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); + regionServer.getBootstrapNodes() + .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); + return builder.build(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java index e9d1cb41c77..32cc4bf7897 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -61,6 +62,7 @@ public class TestRpcConnectionRegistry { UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1); UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1); UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0); + UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1); UTIL.startMiniCluster(3); HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java new file mode 100644 index 00000000000..c415fc75018 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBootstrapNodeManager.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestBootstrapNodeManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBootstrapNodeManager.class); + + private Configuration conf; + + private ClusterConnection conn; + + private MasterAddressTracker tracker; + + private BootstrapNodeManager manager; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + conf.setLong(BootstrapNodeManager.REQUEST_MASTER_INTERVAL_SECS, 5); + conf.setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1); + conf.setLong(BootstrapNodeManager.REQUEST_REGIONSERVER_INTERVAL_SECS, 1); + conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, 2); + conn = mock(ClusterConnection.class); + when(conn.getConfiguration()).thenReturn(conf); + tracker = mock(MasterAddressTracker.class); + } + + @After + public void tearDown() { + if (manager != null) { + manager.stop(); + } + } + + private void assertListEquals(List expected, List actual) { + assertEquals(expected.size(), expected.size()); + assertThat(actual, hasItems(expected.toArray(new ServerName[0]))); + } + + @Test + public void testNormal() throws Exception { + List regionServers = + Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime())); + when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers); + when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers); + manager = new BootstrapNodeManager(conn, tracker); + Thread.sleep(3000); + verify(conn, times(1)).getLiveRegionServers(any(), anyInt()); + verify(conn, atLeastOnce()).getAllBootstrapNodes(any()); + assertListEquals(regionServers, manager.getBootstrapNodes()); + } + + // if we do not return enough region servers, we will always get from master + @Test + public void testOnlyMaster() throws Exception { + List regionServers = + Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime())); + when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers); + when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers); + manager = new BootstrapNodeManager(conn, tracker); + Thread.sleep(3000); + verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt()); + verify(conn, never()).getAllBootstrapNodes(any()); + assertListEquals(regionServers, manager.getBootstrapNodes()); + } + + @Test + public void testRegionServerError() throws Exception { + List regionServers = + Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime())); + List newRegionServers = + Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()), + ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime())); + when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers); + when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> { + if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) { + throw new IOException("Inject error"); + } else { + return regionServers.subList(0, 3); + } + }); + manager = new BootstrapNodeManager(conn, tracker); + // we should remove server4 from the list + Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3); + assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes()); + when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(newRegionServers); + doAnswer(invocation -> { + String hostname = invocation.getArgument(0, ServerName.class).getHostname(); + switch (hostname) { + case "server1": + return regionServers.subList(0, 1); + case "server2": + case "server3": + throw new IOException("Inject error"); + default: + return newRegionServers; + } + }).when(conn).getAllBootstrapNodes(any()); + // we should remove server2, server3 from the list and then get the new list from master again + Waiter.waitFor(conf, 30000, () -> { + List bootstrapNodes = manager.getBootstrapNodes(); + if (bootstrapNodes.size() != 2) { + return false; + } + String hostname = bootstrapNodes.get(0).getHostname(); + return hostname.equals("server5") || hostname.equals("server6"); + }); + assertListEquals(newRegionServers, manager.getBootstrapNodes()); + } +} diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java deleted file mode 100644 index e478639737f..00000000000 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; - -/** - * Class for tracking the region servers for a cluster. - */ -@InterfaceAudience.Private -public class RegionServerAddressTracker extends ZKListener { - - private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class); - - private volatile List regionServers = Collections.emptyList(); - - private final Abortable abortable; - - public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) { - super(watcher); - this.abortable = abortable; - watcher.registerListener(this); - loadRegionServerList(); - } - - private void loadRegionServerList() { - List names; - try { - names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); - } catch (KeeperException e) { - LOG.error("failed to list region servers", e); - abortable.abort("failed to list region servers", e); - return; - } - if (CollectionUtils.isEmpty(names)) { - regionServers = Collections.emptyList(); - } else { - regionServers = names.stream().map(ServerName::parseServerName) - .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); - } - } - - @Override - public void nodeChildrenChanged(String path) { - if (path.equals(watcher.getZNodePaths().rsZNode)) { - loadRegionServerList(); - } - } - - public List getRegionServers() { - return regionServers; - } -} diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java deleted file mode 100644 index 6d2cc2b419d..00000000000 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ZKTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; - -@Category({ ZKTests.class, MediumTests.class }) -public class TestRegionServerAddressTracker { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class); - - private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); - - private ZKWatcher zk; - - private RegionServerAddressTracker tracker; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniZKCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniZKCluster(); - } - - @Before - public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException { - TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName()); - zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null); - ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); - tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable()); - } - - @After - public void tearDown() throws IOException { - Closeables.close(zk, true); - } - - @Test - public void test() throws KeeperException { - ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime()); - ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString())); - TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1); - assertEquals(rs1, tracker.getRegionServers().get(0)); - - ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime()); - ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString())); - TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2); - assertThat(tracker.getRegionServers(), hasItems(rs1, rs2)); - - ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString())); - TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1); - assertEquals(rs2, tracker.getRegionServers().get(0)); - - ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString())); - TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty()); - } - - private static final class WarnOnlyAbortable implements Abortable { - @Override - public void abort(String why, Throwable e) { - LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e); - } - - @Override - public boolean isAborted() { - return false; - } - } -}