HBASE-25931 Move FavoredNodeManager to hbase-balancer module (#3324)
Signed-off-by: Yulin Niu <niuyulin@apache.org> Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
ed8df5eded
commit
7218c83f81
|
@ -88,6 +88,14 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
|
|||
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -34,9 +35,8 @@ import java.util.stream.Collectors;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
||||
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -49,17 +49,18 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and
|
||||
* META table. Its the centralized store for all favored nodes information. All reads and updates
|
||||
* should be done through this class. There should only be one instance of
|
||||
* {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information
|
||||
* from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except
|
||||
* for tools that only read or fortest cases). All other classes including Favored balancers
|
||||
* and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
|
||||
* FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and META
|
||||
* table. Its the centralized store for all favored nodes information. All reads and updates should
|
||||
* be done through this class. There should only be one instance of {@link FavoredNodesManager} in
|
||||
* Master. {@link FavoredNodesPlan} and favored node information from
|
||||
* {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except for
|
||||
* tools that only read or fortest cases). All other classes including Favored balancers and
|
||||
* {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
|
||||
* read/write/deletes to favored nodes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FavoredNodesManager {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);
|
||||
|
||||
private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
|
||||
|
@ -67,21 +68,19 @@ public class FavoredNodesManager {
|
|||
private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
|
||||
private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap;
|
||||
|
||||
private final MasterServices masterServices;
|
||||
private final RackManager rackManager;
|
||||
private final ClusterInfoProvider provider;
|
||||
|
||||
/**
|
||||
* Datanode port to be used for Favored Nodes.
|
||||
*/
|
||||
private int datanodeDataTransferPort;
|
||||
|
||||
public FavoredNodesManager(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
public FavoredNodesManager(ClusterInfoProvider provider) {
|
||||
this.provider = provider;
|
||||
this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
|
||||
this.primaryRSToRegionMap = new HashMap<>();
|
||||
this.secondaryRSToRegionMap = new HashMap<>();
|
||||
this.teritiaryRSToRegionMap = new HashMap<>();
|
||||
this.rackManager = new RackManager(masterServices.getConfiguration());
|
||||
}
|
||||
|
||||
public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) {
|
||||
|
@ -96,10 +95,12 @@ public class FavoredNodesManager {
|
|||
datanodeDataTransferPort= getDataNodePort();
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|FavoredNodesManager).java")
|
||||
public int getDataNodePort() {
|
||||
HdfsConfiguration.init();
|
||||
|
||||
Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration());
|
||||
Configuration dnConf = new HdfsConfiguration(provider.getConfiguration());
|
||||
|
||||
int dnPort = NetUtils.createSocketAddr(
|
||||
dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||
|
@ -112,7 +113,7 @@ public class FavoredNodesManager {
|
|||
return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Favored nodes are not applicable for system tables. We will use this to check before
|
||||
* we apply any favored nodes logic on a region.
|
||||
*/
|
||||
|
@ -128,7 +129,7 @@ public class FavoredNodesManager {
|
|||
return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* This should only be used when sending FN information to the region servers. Instead of
|
||||
* sending the region server port, we use the datanode port. This helps in centralizing the DN
|
||||
* port logic in Master. The RS uses the port from the favored node list as hints.
|
||||
|
@ -148,7 +149,6 @@ public class FavoredNodesManager {
|
|||
|
||||
public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap)
|
||||
throws IOException {
|
||||
|
||||
Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>();
|
||||
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) {
|
||||
RegionInfo regionInfo = entry.getKey();
|
||||
|
@ -187,7 +187,7 @@ public class FavoredNodesManager {
|
|||
|
||||
// Lets do a bulk update to meta since that reduces the RPC's
|
||||
FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
|
||||
masterServices.getConnection());
|
||||
provider.getConnection());
|
||||
deleteFavoredNodesForRegions(regionToFavoredNodes.keySet());
|
||||
|
||||
for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
|
||||
|
@ -227,13 +227,13 @@ public class FavoredNodesManager {
|
|||
teritiaryRSToRegionMap.put(serverToUse, regionList);
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Get the replica count for the servers provided.
|
||||
*
|
||||
* For each server, replica count includes three counts for primary, secondary and tertiary.
|
||||
* If a server is the primary favored node for 10 regions, secondary for 5 and tertiary
|
||||
* for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is
|
||||
* not a favored node for any region, the replica count would be [0, 0, 0].
|
||||
* <p/>
|
||||
* For each server, replica count includes three counts for primary, secondary and tertiary. If a
|
||||
* server is the primary favored node for 10 regions, secondary for 5 and tertiary for 1, then the
|
||||
* list would be [10, 5, 1]. If the server is newly added to the cluster is not a favored node for
|
||||
* any region, the replica count would be [0, 0, 0].
|
||||
*/
|
||||
public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
|
||||
Map<ServerName, List<Integer>> result = Maps.newHashMap();
|
||||
|
@ -297,8 +297,4 @@ public class FavoredNodesManager {
|
|||
}
|
||||
return regionInfos;
|
||||
}
|
||||
|
||||
public RackManager getRackManager() {
|
||||
return rackManager;
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.BalancerRejection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
|
@ -47,6 +48,11 @@ public interface ClusterInfoProvider extends ConfigurationObserver {
|
|||
*/
|
||||
Configuration getConfiguration();
|
||||
|
||||
/**
|
||||
* Returns a reference to the cluster's connection.
|
||||
*/
|
||||
Connection getConnection();
|
||||
|
||||
/**
|
||||
* Get all the regions of this cluster.
|
||||
* <p/>
|
||||
|
|
|
@ -31,12 +31,13 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.BalancerRejection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
|
||||
public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
||||
|
||||
private final Configuration conf;
|
||||
private volatile Configuration conf;
|
||||
|
||||
public DummyClusterInfoProvider(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
@ -47,6 +48,11 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
|||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionInfo> getAssignedRegions() {
|
||||
return Collections.emptyList();
|
||||
|
@ -99,5 +105,6 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
|||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,8 +118,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
Set<RegionInfo> regionSet = new HashSet<>(regions);
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
|
||||
try {
|
||||
FavoredNodeAssignmentHelper helper =
|
||||
new FavoredNodeAssignmentHelper(servers, fnm.getRackManager());
|
||||
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
|
||||
helper.initialize();
|
||||
|
||||
Set<RegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.BalancerRejection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
|
@ -88,6 +89,11 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
|
|||
return services.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() {
|
||||
return services.getConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionInfo> getAssignedRegions() {
|
||||
AssignmentManager am = services.getAssignmentManager();
|
||||
|
@ -169,5 +175,4 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
|
|||
NamedQueueRecorder getNamedQueueRecorder() {
|
||||
return namedQueueRecorder;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -355,7 +355,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
internalBalancer.setClusterInfoProvider(provider);
|
||||
// special handling for favor node balancers
|
||||
if (internalBalancer instanceof FavoredNodesPromoter) {
|
||||
favoredNodesManager = new FavoredNodesManager(masterServices);
|
||||
favoredNodesManager = new FavoredNodesManager(provider);
|
||||
if (internalBalancer instanceof FavoredNodeLoadBalancer) {
|
||||
((FavoredNodeLoadBalancer) internalBalancer).setMasterServices(masterServices);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue