HBASE-25916 Move FavoredNodeLoadBalancer to hbase-balancer module (#3327)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
1ccba10847
commit
06c6e06803
|
@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -814,7 +817,7 @@ public class FavoredNodeAssignmentHelper {
|
|||
return generatedFavNodes;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Get the rack of server from local mapping when present, saves lookup by the RackManager.
|
||||
*/
|
||||
private String getRackOfServer(ServerName sn) {
|
||||
|
@ -826,4 +829,13 @@ public class FavoredNodeAssignmentHelper {
|
|||
return rack;
|
||||
}
|
||||
}
|
||||
|
||||
public static int getDataNodePort(Configuration conf) {
|
||||
HdfsConfiguration.init();
|
||||
Configuration dnConf = new HdfsConfiguration(conf);
|
||||
int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
||||
LOG.debug("Loaded default datanode port for FN: {}", dnPort);
|
||||
return dnPort;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,9 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -67,17 +65,11 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
|||
public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
|
||||
|
||||
private MasterServices services;
|
||||
private FavoredNodesManager fnm;
|
||||
|
||||
public void setMasterServices(MasterServices services) {
|
||||
this.services = services;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize() {
|
||||
super.initialize();
|
||||
this.fnm = services.getFavoredNodesManager();
|
||||
public void setFavoredNodesManager(FavoredNodesManager fnm) {
|
||||
this.fnm = fnm;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,8 +78,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
|
||||
ServerManager serverMgr = services.getServerManager();
|
||||
for (ServerName sn : serverMgr.getOnlineServersList()) {
|
||||
for (ServerName sn : provider.getOnlineServersList()) {
|
||||
ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
|
||||
// FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
|
||||
serverNameWithoutCodeToServerName.put(s, sn);
|
||||
|
@ -106,9 +97,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
|
||||
continue; // either favorednodes does not exist or we are already on the primary node
|
||||
}
|
||||
ServerName destination = null;
|
||||
// check whether the primary is available
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
|
||||
ServerName destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
|
||||
if (destination == null) {
|
||||
// check whether the region is on secondary/tertiary
|
||||
if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
|
||||
|
@ -117,10 +107,10 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
}
|
||||
// the region is currently on none of the favored nodes
|
||||
// get it on one of them if possible
|
||||
ServerMetrics l1 = services.getServerManager()
|
||||
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
|
||||
ServerMetrics l2 = services.getServerManager()
|
||||
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
|
||||
ServerMetrics l1 =
|
||||
provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
|
||||
ServerMetrics l2 =
|
||||
provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
|
||||
if (l1 != null && l2 != null) {
|
||||
if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
|
||||
|
@ -176,8 +166,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
Map<ServerName,List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
|
||||
List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
|
||||
assignmentMap = new HashMap<>();
|
||||
roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes,
|
||||
servers);
|
||||
roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes);
|
||||
// merge the assignment maps
|
||||
assignmentMap.putAll(regionsWithFavoredNodesMap);
|
||||
} catch (Exception ex) {
|
||||
|
@ -283,8 +272,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
// assign the region to the one with a lower load
|
||||
// (both have the desired hdfs blocks)
|
||||
ServerName s;
|
||||
ServerMetrics tertiaryLoad = services.getServerManager().getLoad(tertiaryHost);
|
||||
ServerMetrics secondaryLoad = services.getServerManager().getLoad(secondaryHost);
|
||||
ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
|
||||
ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
|
||||
if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
|
||||
s = secondaryHost;
|
||||
} else {
|
||||
|
@ -314,8 +303,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
}
|
||||
|
||||
private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap,
|
||||
List<RegionInfo> regions, List<ServerName> servers) throws IOException {
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException {
|
||||
Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
|
||||
// figure the primary RSs
|
||||
assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
|
|
@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.favored;
|
|||
|
||||
import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.getDataNodePort;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
|
||||
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -32,17 +32,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
||||
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
|
@ -61,8 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
|||
@InterfaceAudience.Private
|
||||
public class FavoredNodesManager {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);
|
||||
|
||||
private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
|
||||
private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
|
||||
private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
|
||||
|
@ -83,30 +75,21 @@ public class FavoredNodesManager {
|
|||
this.teritiaryRSToRegionMap = new HashMap<>();
|
||||
}
|
||||
|
||||
public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) {
|
||||
public void initializeFromMeta() throws IOException {
|
||||
SnapshotOfRegionAssignmentFromMeta snapshot =
|
||||
new SnapshotOfRegionAssignmentFromMeta(provider.getConnection());
|
||||
snapshot.initialize();
|
||||
// Add snapshot to structures made on creation. Current structures may have picked
|
||||
// up data between construction and the scan of meta needed before this method
|
||||
// is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
|
||||
this.globalFavoredNodesAssignmentPlan.
|
||||
updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
|
||||
primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
|
||||
secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
|
||||
teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
|
||||
datanodeDataTransferPort= getDataNodePort();
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|FavoredNodesManager).java")
|
||||
public int getDataNodePort() {
|
||||
HdfsConfiguration.init();
|
||||
|
||||
Configuration dnConf = new HdfsConfiguration(provider.getConfiguration());
|
||||
|
||||
int dnPort = NetUtils.createSocketAddr(
|
||||
dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
||||
LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
|
||||
return dnPort;
|
||||
// is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
|
||||
synchronized (this) {
|
||||
this.globalFavoredNodesAssignmentPlan
|
||||
.updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
|
||||
primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
|
||||
secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
|
||||
teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
|
||||
datanodeDataTransferPort = getDataNodePort(provider.getConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
|
||||
|
|
|
@ -37,4 +37,6 @@ public interface FavoredNodesPromoter {
|
|||
throws IOException;
|
||||
|
||||
List<ServerName> getFavoredNodes(RegionInfo regionInfo);
|
||||
|
||||
void setFavoredNodesManager(FavoredNodesManager fnm);
|
||||
}
|
||||
|
|
|
@ -60,6 +60,11 @@ public interface ClusterInfoProvider extends ConfigurationObserver {
|
|||
*/
|
||||
List<RegionInfo> getAssignedRegions();
|
||||
|
||||
/**
|
||||
* Unassign the given region.
|
||||
*/
|
||||
void unassign(RegionInfo regionInfo) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the table descriptor for the given table.
|
||||
*/
|
||||
|
@ -83,6 +88,11 @@ public interface ClusterInfoProvider extends ConfigurationObserver {
|
|||
*/
|
||||
boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a copy of the internal list of online servers.
|
||||
*/
|
||||
List<ServerName> getOnlineServersList();
|
||||
|
||||
/**
|
||||
* Returns a copy of the internal list of online servers matched by the given {@code filter}.
|
||||
*/
|
||||
|
@ -110,4 +120,9 @@ public interface ClusterInfoProvider extends ConfigurationObserver {
|
|||
* Record the given balancer rejection.
|
||||
*/
|
||||
void recordBalancerRejection(Supplier<BalancerRejection> rejection);
|
||||
|
||||
/**
|
||||
* Returns server metrics of the given server if serverName is known else null
|
||||
*/
|
||||
ServerMetrics getLoad(ServerName serverName);
|
||||
}
|
||||
|
|
|
@ -27,12 +27,10 @@ import edu.umd.cs.findbugs.annotations.NonNull;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
|||
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -77,12 +74,12 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
FavoredNodesPromoter {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FavoredStochasticBalancer.class);
|
||||
private FavoredNodesManager fnm;
|
||||
private MasterServices services;
|
||||
|
||||
public void setMasterServices(MasterServices services) {
|
||||
this.services = services;
|
||||
this.fnm = services.getFavoredNodesManager();
|
||||
private FavoredNodesManager fnm;
|
||||
|
||||
@Override
|
||||
public void setFavoredNodesManager(FavoredNodesManager fnm) {
|
||||
this.fnm = fnm;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,11 +109,11 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
|
||||
List<ServerName> servers) throws HBaseIOException {
|
||||
metricsBalancer.incrMiscInvocations();
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
|
||||
if (regions.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
return assignmentMap;
|
||||
}
|
||||
Set<RegionInfo> regionSet = new HashSet<>(regions);
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
|
||||
try {
|
||||
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
|
||||
helper.initialize();
|
||||
|
@ -154,7 +151,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return;
|
||||
}
|
||||
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : otherAssignments.entrySet()) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : otherAssignments.entrySet()) {
|
||||
ServerName sn = entry.getKey();
|
||||
List<RegionInfo> regionsList = entry.getValue();
|
||||
if (assignmentMap.get(sn) == null) {
|
||||
|
@ -244,7 +241,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return null;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Assign the region to primary if its available. If both secondary and tertiary are available,
|
||||
* assign to the host which has less load. Else assign to secondary or tertiary whichever is
|
||||
* available (in that order).
|
||||
|
@ -252,16 +249,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
private void assignRegionToAvailableFavoredNode(
|
||||
Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region,
|
||||
ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
|
||||
|
||||
if (primaryHost != null) {
|
||||
addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
|
||||
|
||||
} else if (secondaryHost != null && tertiaryHost != null) {
|
||||
|
||||
// Assign the region to the one with a lower load (both have the desired hdfs blocks)
|
||||
ServerName s;
|
||||
ServerMetrics tertiaryLoad = services.getServerManager().getLoad(tertiaryHost);
|
||||
ServerMetrics secondaryLoad = services.getServerManager().getLoad(secondaryHost);
|
||||
ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
|
||||
ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
|
||||
if (secondaryLoad != null && tertiaryLoad != null) {
|
||||
if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
|
||||
s = secondaryHost;
|
||||
|
@ -283,7 +277,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* If we have favored nodes for a region, we will return one of the FN as destination. If
|
||||
* favored nodes are not present for a region, we will generate and return one of the FN as
|
||||
* destination. If we can't generate anything, lets fallback.
|
||||
|
@ -334,7 +328,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
fnm.updateFavoredNodes(regionFNMap);
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
|
||||
*/
|
||||
@Override
|
||||
|
@ -356,8 +350,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
|
||||
|
||||
try {
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : result.entrySet()) {
|
||||
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : result.entrySet()) {
|
||||
ServerName sn = entry.getKey();
|
||||
ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
|
||||
|
||||
|
@ -420,7 +413,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return assignmentMap;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Return list of favored nodes that are online.
|
||||
*/
|
||||
private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
|
||||
|
@ -447,13 +440,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
|
||||
/**
|
||||
* Generate Favored Nodes for daughters during region split.
|
||||
*
|
||||
* <p/>
|
||||
* If the parent does not have FN, regenerates them for the daughters.
|
||||
*
|
||||
* <p/>
|
||||
* If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
|
||||
* The primary FN for both the daughters should be the same as parent. Inherit the secondary
|
||||
* FN from the parent but keep it different for each daughter. Choose the remaining FN
|
||||
* randomly. This would give us better distribution over a period of time after enough splits.
|
||||
* The primary FN for both the daughters should be the same as parent. Inherit the secondary FN
|
||||
* from the parent but keep it different for each daughter. Choose the remaining FN randomly. This
|
||||
* would give us better distribution over a period of time after enough splits.
|
||||
*/
|
||||
@Override
|
||||
public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
|
||||
|
@ -505,7 +498,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return daughterFN;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Generate favored nodes for a region during merge. Choose the FN from one of the sources to
|
||||
* keep it simple.
|
||||
*/
|
||||
|
@ -515,7 +508,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(mergeParents[0]));
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Pick favored nodes with the highest locality for a region with lowest locality.
|
||||
*/
|
||||
private class FavoredNodeLocalityPicker extends CandidateGenerator {
|
||||
|
@ -662,48 +655,43 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
*/
|
||||
@Override
|
||||
protected List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
if (this.services != null) {
|
||||
List<RegionPlan> regionPlans = Lists.newArrayList();
|
||||
Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
|
||||
int misplacedRegions = 0;
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
List<RegionPlan> regionPlans = Lists.newArrayList();
|
||||
Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
|
||||
int misplacedRegions = 0;
|
||||
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
|
||||
ServerName current = entry.getKey();
|
||||
List<RegionInfo> regions = Lists.newArrayList();
|
||||
correctAssignments.put(current, regions);
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
|
||||
ServerName current = entry.getKey();
|
||||
List<RegionInfo> regions = Lists.newArrayList();
|
||||
correctAssignments.put(current, regions);
|
||||
|
||||
for (RegionInfo hri : entry.getValue()) {
|
||||
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
|
||||
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null ||
|
||||
!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
|
||||
regions.add(hri);
|
||||
|
||||
} else {
|
||||
// No favored nodes, lets unassign.
|
||||
LOG.warn("Region not on favored nodes, unassign. Region: " + hri
|
||||
+ " current: " + current + " favored nodes: " + favoredNodes);
|
||||
try {
|
||||
this.services.getAssignmentManager().unassign(hri);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed unassign", e);
|
||||
continue;
|
||||
}
|
||||
RegionPlan rp = new RegionPlan(hri, null, null);
|
||||
regionPlans.add(rp);
|
||||
misplacedRegions++;
|
||||
for (RegionInfo hri : entry.getValue()) {
|
||||
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
|
||||
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null ||
|
||||
!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
|
||||
regions.add(hri);
|
||||
} else {
|
||||
// No favored nodes, lets unassign.
|
||||
LOG.warn("Region not on favored nodes, unassign. Region: " + hri + " current: " +
|
||||
current + " favored nodes: " + favoredNodes);
|
||||
try {
|
||||
provider.unassign(hri);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed unassign", e);
|
||||
continue;
|
||||
}
|
||||
RegionPlan rp = new RegionPlan(hri, null, null);
|
||||
regionPlans.add(rp);
|
||||
misplacedRegions++;
|
||||
}
|
||||
}
|
||||
LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
|
||||
List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
|
||||
if (regionPlansFromBalance != null) {
|
||||
regionPlans.addAll(regionPlansFromBalance);
|
||||
}
|
||||
return regionPlans;
|
||||
} else {
|
||||
return super.balanceTable(tableName, loadOfOneTable);
|
||||
}
|
||||
LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
|
||||
List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
|
||||
if (regionPlansFromBalance != null) {
|
||||
regionPlans.addAll(regionPlansFromBalance);
|
||||
}
|
||||
return regionPlans;
|
||||
}
|
||||
}
|
||||
|
|
@ -30,8 +30,8 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -286,8 +286,7 @@ public class TestFavoredNodeAssignmentHelper {
|
|||
}
|
||||
|
||||
private Triple<Map<RegionInfo, ServerName>, FavoredNodeAssignmentHelper, List<RegionInfo>>
|
||||
secondaryAndTertiaryRSPlacementHelper(
|
||||
int regionCount, Map<String, Integer> rackToServerCount) {
|
||||
secondaryAndTertiaryRSPlacementHelper(int regionCount, Map<String, Integer> rackToServerCount) {
|
||||
Map<RegionInfo, ServerName> primaryRSMap = new HashMap<RegionInfo, ServerName>();
|
||||
List<ServerName> servers = getServersFromRack(rackToServerCount);
|
||||
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
|
||||
|
@ -321,7 +320,9 @@ public class TestFavoredNodeAssignmentHelper {
|
|||
assertTrue(helper.canPlaceFavoredNodes());
|
||||
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
|
||||
if (primaryRSMap == null) primaryRSMap = new HashMap<>();
|
||||
if (primaryRSMap == null) {
|
||||
primaryRSMap = new HashMap<>();
|
||||
}
|
||||
// create some regions
|
||||
List<RegionInfo> regions = new ArrayList<>(regionCount);
|
||||
for (int i = 0; i < regionCount; i++) {
|
||||
|
@ -348,34 +349,36 @@ public class TestFavoredNodeAssignmentHelper {
|
|||
}
|
||||
// Verify that the regions got placed in the way we expect (documented in
|
||||
// FavoredNodeAssignmentHelper#placePrimaryRSAsRoundRobin)
|
||||
checkNumRegions(regionCount, firstRackSize, secondRackSize, thirdRackSize, regionsOnRack1,
|
||||
regionsOnRack2, regionsOnRack3, assignmentMap);
|
||||
checkNumRegions(firstRackSize, secondRackSize, thirdRackSize, regionsOnRack1, regionsOnRack2,
|
||||
regionsOnRack3);
|
||||
}
|
||||
|
||||
private void checkNumRegions(int regionCount, int firstRackSize, int secondRackSize,
|
||||
int thirdRackSize, int regionsOnRack1, int regionsOnRack2, int regionsOnRack3,
|
||||
Map<ServerName, List<RegionInfo>> assignmentMap) {
|
||||
//The regions should be distributed proportionately to the racksizes
|
||||
//Verify the ordering was as expected by inserting the racks and regions
|
||||
//in sorted maps. The keys being the racksize and numregions; values are
|
||||
//the relative positions of the racksizes and numregions respectively
|
||||
SortedMap<Integer, Integer> rackMap = new TreeMap<>();
|
||||
private void checkNumRegions(int firstRackSize, int secondRackSize, int thirdRackSize,
|
||||
int regionsOnRack1, int regionsOnRack2, int regionsOnRack3) {
|
||||
// The regions should be distributed proportionately to the racksizes
|
||||
// Verify the ordering was as expected by inserting the racks and regions
|
||||
// in sorted maps. The keys being the racksize and numregions; values are
|
||||
// the relative positions of the racksizes and numregions respectively
|
||||
NavigableMap<Integer, Integer> rackMap = new TreeMap<>();
|
||||
rackMap.put(firstRackSize, 1);
|
||||
rackMap.put(secondRackSize, 2);
|
||||
rackMap.put(thirdRackSize, 3);
|
||||
SortedMap<Integer, Integer> regionMap = new TreeMap<>();
|
||||
NavigableMap<Integer, Integer> regionMap = new TreeMap<>();
|
||||
regionMap.put(regionsOnRack1, 1);
|
||||
regionMap.put(regionsOnRack2, 2);
|
||||
regionMap.put(regionsOnRack3, 3);
|
||||
assertTrue(printProportions(firstRackSize, secondRackSize, thirdRackSize,
|
||||
regionsOnRack1, regionsOnRack2, regionsOnRack3),
|
||||
rackMap.get(firstRackSize) == regionMap.get(regionsOnRack1));
|
||||
assertTrue(printProportions(firstRackSize, secondRackSize, thirdRackSize,
|
||||
regionsOnRack1, regionsOnRack2, regionsOnRack3),
|
||||
rackMap.get(secondRackSize) == regionMap.get(regionsOnRack2));
|
||||
assertTrue(printProportions(firstRackSize, secondRackSize, thirdRackSize,
|
||||
regionsOnRack1, regionsOnRack2, regionsOnRack3),
|
||||
rackMap.get(thirdRackSize) == regionMap.get(regionsOnRack3));
|
||||
assertEquals(
|
||||
printProportions(firstRackSize, secondRackSize, thirdRackSize, regionsOnRack1, regionsOnRack2,
|
||||
regionsOnRack3),
|
||||
rackMap.get(firstRackSize).intValue(), regionMap.get(regionsOnRack1).intValue());
|
||||
assertEquals(
|
||||
printProportions(firstRackSize, secondRackSize, thirdRackSize, regionsOnRack1, regionsOnRack2,
|
||||
regionsOnRack3),
|
||||
rackMap.get(secondRackSize).intValue(), regionMap.get(regionsOnRack2).intValue());
|
||||
assertEquals(
|
||||
printProportions(firstRackSize, secondRackSize, thirdRackSize, regionsOnRack1, regionsOnRack2,
|
||||
regionsOnRack3),
|
||||
rackMap.get(thirdRackSize).intValue(), regionMap.get(regionsOnRack3).intValue());
|
||||
}
|
||||
|
||||
private String printProportions(int firstRackSize, int secondRackSize,
|
|
@ -58,6 +58,10 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
|||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unassign(RegionInfo regionInfo) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
|
||||
return null;
|
||||
|
@ -79,6 +83,11 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getOnlineServersList() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
|
||||
Predicate<ServerMetrics> filter) {
|
||||
|
@ -107,4 +116,9 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
|
|||
public void onConfigurationChange(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerMetrics getLoad(ServerName serverName) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1003,11 +1003,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
// Initialize after meta is up as below scans meta
|
||||
if (getFavoredNodesManager() != null && !maintenanceMode) {
|
||||
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
|
||||
new SnapshotOfRegionAssignmentFromMeta(getConnection());
|
||||
snapshotOfRegionAssignment.initialize();
|
||||
getFavoredNodesManager().initialize(snapshotOfRegionAssignment);
|
||||
FavoredNodesManager fnm = getFavoredNodesManager();
|
||||
if (fnm != null) {
|
||||
fnm.initializeFromMeta();
|
||||
}
|
||||
|
||||
// set cluster status again after user regions are assigned
|
||||
|
|
|
@ -100,6 +100,14 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
|
|||
return am != null ? am.getAssignedRegions() : Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unassign(RegionInfo regionInfo) throws IOException {
|
||||
AssignmentManager am = services.getAssignmentManager();
|
||||
if (am != null) {
|
||||
am.unassign(regionInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
|
||||
TableDescriptors tds = services.getTableDescriptors();
|
||||
|
@ -127,6 +135,12 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getOnlineServersList() {
|
||||
ServerManager sm = services.getServerManager();
|
||||
return sm != null ? sm.getOnlineServersList() : Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
|
||||
Predicate<ServerMetrics> filter) {
|
||||
|
@ -170,6 +184,12 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerMetrics getLoad(ServerName serverName) {
|
||||
ServerManager sm = services.getServerManager();
|
||||
return sm != null ? sm.getLoad(serverName) : null;
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
NamedQueueRecorder getNamedQueueRecorder() {
|
||||
|
|
|
@ -34,14 +34,12 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodeLoadBalancer;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
|
||||
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
|
||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||
import org.apache.hadoop.hbase.master.balancer.MasterClusterInfoProvider;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
|
@ -356,12 +354,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
// special handling for favor node balancers
|
||||
if (internalBalancer instanceof FavoredNodesPromoter) {
|
||||
favoredNodesManager = new FavoredNodesManager(provider);
|
||||
if (internalBalancer instanceof FavoredNodeLoadBalancer) {
|
||||
((FavoredNodeLoadBalancer) internalBalancer).setMasterServices(masterServices);
|
||||
}
|
||||
if (internalBalancer instanceof FavoredStochasticBalancer) {
|
||||
((FavoredStochasticBalancer) internalBalancer).setMasterServices(masterServices);
|
||||
}
|
||||
((FavoredNodesPromoter) internalBalancer).setFavoredNodesManager(favoredNodesManager);
|
||||
}
|
||||
internalBalancer.initialize();
|
||||
// init fallback groups
|
||||
|
|
|
@ -306,7 +306,7 @@ public class TestTableFavoredNodes {
|
|||
snRSMap.put(rst.getMaster().getServerName(), rst.getMaster());
|
||||
}
|
||||
|
||||
int dnPort = fnm.getDataNodePort();
|
||||
int dnPort = FavoredNodeAssignmentHelper.getDataNodePort(TEST_UTIL.getConfiguration());
|
||||
RegionLocator regionLocator = admin.getConnection().getRegionLocator(tableName);
|
||||
for (HRegionLocation regionLocation : regionLocator.getAllRegionLocations()) {
|
||||
|
||||
|
|
|
@ -85,7 +85,8 @@ public class TestRegionPlacement2 {
|
|||
(FavoredNodeLoadBalancer) LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration());
|
||||
balancer.setClusterInfoProvider(
|
||||
new MasterClusterInfoProvider(TEST_UTIL.getMiniHBaseCluster().getMaster()));
|
||||
balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster());
|
||||
balancer
|
||||
.setFavoredNodesManager(TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager());
|
||||
balancer.initialize();
|
||||
List<ServerName> servers = new ArrayList<>();
|
||||
for (int i = 0; i < SLAVES; i++) {
|
||||
|
@ -149,7 +150,8 @@ public class TestRegionPlacement2 {
|
|||
(FavoredNodeLoadBalancer) LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration());
|
||||
balancer.setClusterInfoProvider(
|
||||
new MasterClusterInfoProvider(TEST_UTIL.getMiniHBaseCluster().getMaster()));
|
||||
balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster());
|
||||
balancer
|
||||
.setFavoredNodesManager(TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager());
|
||||
balancer.initialize();
|
||||
List<ServerName> servers = new ArrayList<>();
|
||||
for (int i = 0; i < SLAVES; i++) {
|
||||
|
|
Loading…
Reference in New Issue