HBASE-25852 Move all the intialization work of LoadBalancer implementation to initialize method (#3248)
Signed-off-by: Michael Stack <stack@apache.org> Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
2b6a91a1da
commit
29bd3dd586
|
@ -59,10 +59,11 @@ public interface LoadBalancer extends Stoppable, ConfigurationObserver {
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";
|
String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
|
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
|
||||||
*/
|
*/
|
||||||
void setClusterMetrics(ClusterMetrics st);
|
void updateClusterMetrics(ClusterMetrics metrics);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -53,6 +52,10 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
* The base class for load balancers. It provides the the functions used to by
|
* The base class for load balancers. It provides the the functions used to by
|
||||||
* {@code AssignmentManager} to assign regions in the edge cases. It doesn't provide an
|
* {@code AssignmentManager} to assign regions in the edge cases. It doesn't provide an
|
||||||
* implementation of the actual balancing algorithm.
|
* implementation of the actual balancing algorithm.
|
||||||
|
* <p/>
|
||||||
|
* Since 3.0.0, all the balancers will be wrapped inside a {@code RSGroupBasedLoadBalancer}, it will
|
||||||
|
* be in charge of the synchronization of balancing and configuration changing, so we do not need to
|
||||||
|
* synchronized by ourselves.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class BaseLoadBalancer implements LoadBalancer {
|
public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
@ -76,8 +79,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
// slop for regions
|
// slop for regions
|
||||||
protected float slop;
|
protected float slop;
|
||||||
// overallSlop to control simpleLoadBalancer's cluster level threshold
|
|
||||||
protected float overallSlop;
|
|
||||||
protected RackManager rackManager;
|
protected RackManager rackManager;
|
||||||
protected MetricsBalancer metricsBalancer = null;
|
protected MetricsBalancer metricsBalancer = null;
|
||||||
protected ClusterMetrics clusterStatus = null;
|
protected ClusterMetrics clusterStatus = null;
|
||||||
|
@ -103,38 +104,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
return provider.getConfiguration();
|
return provider.getConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setConf(Configuration conf) {
|
|
||||||
setSlop(conf);
|
|
||||||
if (slop < 0) {
|
|
||||||
slop = 0;
|
|
||||||
} else if (slop > 1) {
|
|
||||||
slop = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (overallSlop < 0) {
|
|
||||||
overallSlop = 0;
|
|
||||||
} else if (overallSlop > 1) {
|
|
||||||
overallSlop = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.rackManager = new RackManager(conf);
|
|
||||||
useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", true);
|
|
||||||
if (useRegionFinder) {
|
|
||||||
regionFinder = new RegionHDFSBlockLocationFinder();
|
|
||||||
regionFinder.setConf(conf);
|
|
||||||
}
|
|
||||||
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
|
||||||
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
|
|
||||||
LOG.info("slop={}", this.slop);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setSlop(Configuration conf) {
|
|
||||||
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
|
|
||||||
this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setClusterMetrics(ClusterMetrics st) {
|
public void updateClusterMetrics(ClusterMetrics st) {
|
||||||
this.clusterStatus = st;
|
this.clusterStatus = st;
|
||||||
if (useRegionFinder) {
|
if (useRegionFinder) {
|
||||||
regionFinder.setClusterMetrics(st);
|
regionFinder.setClusterMetrics(st);
|
||||||
|
@ -145,10 +116,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public void setClusterInfoProvider(ClusterInfoProvider provider) {
|
public void setClusterInfoProvider(ClusterInfoProvider provider) {
|
||||||
this.provider = provider;
|
this.provider = provider;
|
||||||
setConf(provider.getConfiguration());
|
|
||||||
if (useRegionFinder) {
|
|
||||||
this.regionFinder.setClusterInfoProvider(provider);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -158,57 +125,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRackManager(RackManager rackManager) {
|
|
||||||
this.rackManager = rackManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean needsBalance(TableName tableName, BalancerClusterState c) {
|
|
||||||
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
|
|
||||||
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Not running balancer because only " + cs.getNumServers()
|
|
||||||
+ " active regionserver(s)");
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (areSomeRegionReplicasColocated(c)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if(idleRegionServerExist(c)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we even need to do any load balancing
|
|
||||||
// HBASE-3681 check sloppiness first
|
|
||||||
float average = cs.getLoadAverage(); // for logging
|
|
||||||
int floor = (int) Math.floor(average * (1 - slop));
|
|
||||||
int ceiling = (int) Math.ceil(average * (1 + slop));
|
|
||||||
if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
|
|
||||||
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
// If nothing to balance, then don't say anything unless trace-level logging.
|
|
||||||
LOG.trace("Skipping load balancing because balanced cluster; " +
|
|
||||||
"servers=" + cs.getNumServers() +
|
|
||||||
" regions=" + cs.getNumRegions() + " average=" + average +
|
|
||||||
" mostloaded=" + serversByLoad.lastKey().getLoad() +
|
|
||||||
" leastloaded=" + serversByLoad.firstKey().getLoad());
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Subclasses should implement this to return true if the cluster has nodes that hosts
|
|
||||||
* multiple replicas for the same region, or, if there are multiple racks and the same
|
|
||||||
* rack hosts replicas of the same region
|
|
||||||
* @param c Cluster information
|
|
||||||
* @return whether region replicas are currently co-located
|
|
||||||
*/
|
|
||||||
protected boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final boolean idleRegionServerExist(BalancerClusterState c){
|
protected final boolean idleRegionServerExist(BalancerClusterState c){
|
||||||
boolean isServerExistsWithMoreRegions = false;
|
boolean isServerExistsWithMoreRegions = false;
|
||||||
boolean isServerExistsWithZeroRegions = false;
|
boolean isServerExistsWithZeroRegions = false;
|
||||||
|
@ -456,8 +372,37 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
return Collections.unmodifiableMap(assignments);
|
return Collections.unmodifiableMap(assignments);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final float normalizeSlop(float slop) {
|
||||||
|
if (slop < 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (slop > 1) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return slop;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected float getDefaultSlop() {
|
||||||
|
return 0.2f;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void loadConf(Configuration conf) {
|
||||||
|
this.slop =normalizeSlop(conf.getFloat("hbase.regions.slop", getDefaultSlop()));
|
||||||
|
this.rackManager = new RackManager(conf);
|
||||||
|
useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", true);
|
||||||
|
if (useRegionFinder) {
|
||||||
|
regionFinder = new RegionHDFSBlockLocationFinder();
|
||||||
|
regionFinder.setConf(conf);
|
||||||
|
regionFinder.setClusterInfoProvider(provider);
|
||||||
|
}
|
||||||
|
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||||
|
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
|
||||||
|
LOG.info("slop={}", this.slop);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize() throws HBaseIOException{
|
public void initialize() {
|
||||||
|
loadConf(getConf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -475,7 +420,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(String why) {
|
public void stop(String why) {
|
||||||
LOG.info("Load Balancer stop requested: "+why);
|
LOG.info("Load Balancer stop requested: {}", why);
|
||||||
stopped = true;
|
stopped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +587,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
* @see #balanceTable(TableName, Map)
|
* @see #balanceTable(TableName, Map)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final synchronized List<RegionPlan>
|
public final List<RegionPlan>
|
||||||
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
||||||
preBalanceCluster(loadOfAllTable);
|
preBalanceCluster(loadOfAllTable);
|
||||||
if (isByTable) {
|
if (isByTable) {
|
||||||
|
@ -663,5 +608,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
loadConf(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.balancer;
|
package org.apache.hadoop.hbase.master.balancer;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -58,7 +59,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
||||||
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
|
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
|
||||||
private float avgLoadOverall;
|
private float avgLoadOverall;
|
||||||
private List<ServerAndLoad> serverLoadList = new ArrayList<>();
|
private List<ServerAndLoad> serverLoadList = new ArrayList<>();
|
||||||
|
// overallSlop to control simpleLoadBalancer's cluster level threshold
|
||||||
|
private float overallSlop;
|
||||||
/**
|
/**
|
||||||
* Stores additional per-server information about the regions added/removed
|
* Stores additional per-server information about the regions added/removed
|
||||||
* during the run of the balancing algorithm.
|
* during the run of the balancing algorithm.
|
||||||
|
@ -104,6 +106,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
||||||
/**
|
/**
|
||||||
* Pass RegionStates and allow balancer to set the current cluster load.
|
* Pass RegionStates and allow balancer to set the current cluster load.
|
||||||
*/
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
|
allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java")
|
||||||
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
|
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
|
||||||
serverLoadList.clear();
|
serverLoadList.clear();
|
||||||
Map<ServerName, Integer> server2LoadMap = new HashMap<>();
|
Map<ServerName, Integer> server2LoadMap = new HashMap<>();
|
||||||
|
@ -129,11 +133,17 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
||||||
setClusterLoad(loadOfAllTable);
|
setClusterLoad(loadOfAllTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void loadConf(Configuration conf) {
|
||||||
|
super.loadConf(conf);
|
||||||
|
this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
float originSlop = slop;
|
float originSlop = slop;
|
||||||
float originOverallSlop = overallSlop;
|
float originOverallSlop = overallSlop;
|
||||||
super.setConf(conf);
|
loadConf(conf);
|
||||||
LOG.info("Update configuration of SimpleLoadBalancer, previous slop is {},"
|
LOG.info("Update configuration of SimpleLoadBalancer, previous slop is {},"
|
||||||
+ " current slop is {}, previous overallSlop is {}, current overallSlop is {}",
|
+ " current slop is {}, previous overallSlop is {}, current overallSlop is {}",
|
||||||
originSlop, slop, originOverallSlop, overallSlop);
|
originSlop, slop, originOverallSlop, overallSlop);
|
||||||
|
@ -168,6 +178,38 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean needsBalance(BalancerClusterState c) {
|
||||||
|
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
|
||||||
|
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (idleRegionServerExist(c)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we even need to do any load balancing
|
||||||
|
// HBASE-3681 check sloppiness first
|
||||||
|
float average = cs.getLoadAverage(); // for logging
|
||||||
|
int floor = (int) Math.floor(average * (1 - slop));
|
||||||
|
int ceiling = (int) Math.ceil(average * (1 + slop));
|
||||||
|
if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
|
||||||
|
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
// If nothing to balance, then don't say anything unless trace-level logging.
|
||||||
|
LOG.trace("Skipping load balancing because balanced cluster; " + "servers=" +
|
||||||
|
cs.getNumServers() + " regions=" + cs.getNumRegions() + " average=" + average +
|
||||||
|
" mostloaded=" + serversByLoad.lastKey().getLoad() + " leastloaded=" +
|
||||||
|
serversByLoad.firstKey().getLoad());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate a global load balancing plan according to the specified map of
|
* Generate a global load balancing plan according to the specified map of
|
||||||
* server information to the most loaded regions of each server.
|
* server information to the most loaded regions of each server.
|
||||||
|
@ -262,7 +304,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
||||||
// argument as defaults
|
// argument as defaults
|
||||||
BalancerClusterState c =
|
BalancerClusterState c =
|
||||||
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
|
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
|
||||||
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
|
if (!needsBalance(c) && !this.overallNeedsBalance()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
|
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
|
||||||
|
|
|
@ -64,7 +64,8 @@ public class TestSimpleLoadBalancer extends BalancerTestBase {
|
||||||
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
||||||
conf.set("hbase.regions.slop", "0");
|
conf.set("hbase.regions.slop", "0");
|
||||||
loadBalancer = new SimpleLoadBalancer();
|
loadBalancer = new SimpleLoadBalancer();
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||||
|
loadBalancer.initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
int[] mockUniformCluster = new int[] { 5, 5, 5, 5, 5, 0 };
|
int[] mockUniformCluster = new int[] { 5, 5, 5, 5, 5, 0 };
|
||||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
|
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RackManager;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
|
||||||
|
@ -70,7 +69,6 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
|
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
|
||||||
|
|
||||||
private MasterServices services;
|
private MasterServices services;
|
||||||
private RackManager rackManager;
|
|
||||||
private FavoredNodesManager fnm;
|
private FavoredNodesManager fnm;
|
||||||
|
|
||||||
public void setMasterServices(MasterServices services) {
|
public void setMasterServices(MasterServices services) {
|
||||||
|
@ -78,10 +76,9 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void initialize() throws HBaseIOException {
|
public void initialize() {
|
||||||
super.initialize();
|
super.initialize();
|
||||||
this.fnm = services.getFavoredNodesManager();
|
this.fnm = services.getFavoredNodesManager();
|
||||||
this.rackManager = new RackManager(getConf());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -324,7 +321,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
|
public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
|
||||||
return this.fnm.getFavoredNodes(regionInfo);
|
return this.fnm.getFavoredNodes(regionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -928,8 +928,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
|
|
||||||
// initialize load balancer
|
// initialize load balancer
|
||||||
this.balancer.setMasterServices(this);
|
this.balancer.setMasterServices(this);
|
||||||
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
|
||||||
this.balancer.initialize();
|
this.balancer.initialize();
|
||||||
|
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
||||||
|
|
||||||
// start up all service threads.
|
// start up all service threads.
|
||||||
status.setStatus("Initializing master service threads");
|
status.setStatus("Initializing master service threads");
|
||||||
|
@ -1011,7 +1011,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set cluster status again after user regions are assigned
|
// set cluster status again after user regions are assigned
|
||||||
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
||||||
|
|
||||||
// Start balancer and meta catalog janitor after meta and regions have been assigned.
|
// Start balancer and meta catalog janitor after meta and regions have been assigned.
|
||||||
status.setStatus("Starting balancer and catalog janitor");
|
status.setStatus("Starting balancer and catalog janitor");
|
||||||
|
@ -1724,7 +1724,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
//Give the balancer the current cluster state.
|
//Give the balancer the current cluster state.
|
||||||
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
||||||
|
|
||||||
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
|
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class ClusterStatusChore extends ScheduledChore {
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
try {
|
try {
|
||||||
balancer.setClusterMetrics(master.getClusterMetricsWithoutCoprocessor());
|
balancer.updateClusterMetrics(master.getClusterMetricsWithoutCoprocessor());
|
||||||
} catch (InterruptedIOException e) {
|
} catch (InterruptedIOException e) {
|
||||||
LOG.warn("Ignoring interruption", e);
|
LOG.warn("Ignoring interruption", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,16 +86,11 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize() throws HBaseIOException {
|
protected List<CandidateGenerator> createCandidateGenerators() {
|
||||||
configureGenerators();
|
|
||||||
super.initialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void configureGenerators() {
|
|
||||||
List<CandidateGenerator> fnPickers = new ArrayList<>(2);
|
List<CandidateGenerator> fnPickers = new ArrayList<>(2);
|
||||||
fnPickers.add(new FavoredNodeLoadPicker());
|
fnPickers.add(new FavoredNodeLoadPicker());
|
||||||
fnPickers.add(new FavoredNodeLocalityPicker());
|
fnPickers.add(new FavoredNodeLocalityPicker());
|
||||||
setCandidateGenerators(fnPickers);
|
return fnPickers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class MaintenanceLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClusterMetrics(ClusterMetrics st) {
|
public void updateClusterMetrics(ClusterMetrics st) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||||
import org.apache.hadoop.hbase.client.BalancerRejection;
|
import org.apache.hadoop.hbase.client.BalancerRejection;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.RackManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
|
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
|
||||||
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
|
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
|
||||||
|
@ -47,8 +48,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
|
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
|
||||||
* randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
|
* randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
|
||||||
|
@ -100,8 +99,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
* so that the balancer gets the full picture of all loads on the cluster.</p>
|
* so that the balancer gets the full picture of all loads on the cluster.</p>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
|
||||||
justification="Complaint is about costFunctions not being synchronized; not end of the world")
|
|
||||||
public class StochasticLoadBalancer extends BaseLoadBalancer {
|
public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
|
||||||
|
@ -164,80 +161,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
super(new MetricsStochasticBalancer());
|
super(new MetricsStochasticBalancer());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onConfigurationChange(Configuration conf) {
|
|
||||||
setConf(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected synchronized void setConf(Configuration conf) {
|
|
||||||
super.setConf(conf);
|
|
||||||
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
|
|
||||||
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
|
|
||||||
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
|
|
||||||
runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
|
|
||||||
|
|
||||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
|
||||||
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
|
||||||
if (localityCandidateGenerator == null) {
|
|
||||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
|
|
||||||
}
|
|
||||||
localityCost = new ServerLocalityCostFunction(conf);
|
|
||||||
rackLocalityCost = new RackLocalityCostFunction(conf);
|
|
||||||
|
|
||||||
if (this.candidateGenerators == null) {
|
|
||||||
candidateGenerators = Lists.newArrayList();
|
|
||||||
candidateGenerators.add(new RandomCandidateGenerator());
|
|
||||||
candidateGenerators.add(new LoadCandidateGenerator());
|
|
||||||
candidateGenerators.add(localityCandidateGenerator);
|
|
||||||
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
|
|
||||||
}
|
|
||||||
regionLoadFunctions = new CostFromRegionLoadFunction[] {
|
|
||||||
new ReadRequestCostFunction(conf),
|
|
||||||
new CPRequestCostFunction(conf),
|
|
||||||
new WriteRequestCostFunction(conf),
|
|
||||||
new MemStoreSizeCostFunction(conf),
|
|
||||||
new StoreFileCostFunction(conf)
|
|
||||||
};
|
|
||||||
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
|
|
||||||
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
|
|
||||||
|
|
||||||
costFunctions = new ArrayList<>();
|
|
||||||
addCostFunction(new RegionCountSkewCostFunction(conf));
|
|
||||||
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
|
|
||||||
addCostFunction(new MoveCostFunction(conf));
|
|
||||||
addCostFunction(localityCost);
|
|
||||||
addCostFunction(rackLocalityCost);
|
|
||||||
addCostFunction(new TableSkewCostFunction(conf));
|
|
||||||
addCostFunction(regionReplicaHostCostFunction);
|
|
||||||
addCostFunction(regionReplicaRackCostFunction);
|
|
||||||
addCostFunction(regionLoadFunctions[0]);
|
|
||||||
addCostFunction(regionLoadFunctions[1]);
|
|
||||||
addCostFunction(regionLoadFunctions[2]);
|
|
||||||
addCostFunction(regionLoadFunctions[3]);
|
|
||||||
addCostFunction(regionLoadFunctions[4]);
|
|
||||||
loadCustomCostFunctions(conf);
|
|
||||||
|
|
||||||
curFunctionCosts= new double[costFunctions.size()];
|
|
||||||
tempFunctionCosts= new double[costFunctions.size()];
|
|
||||||
|
|
||||||
isBalancerDecisionRecording = conf
|
|
||||||
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
|
|
||||||
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
|
|
||||||
isBalancerRejectionRecording = conf
|
|
||||||
.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
|
|
||||||
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
|
|
||||||
|
|
||||||
if (this.namedQueueRecorder == null && (isBalancerDecisionRecording
|
|
||||||
|| isBalancerRejectionRecording)) {
|
|
||||||
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
|
|
||||||
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
|
|
||||||
Arrays.toString(getCostFunctionNames()) + " etc.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
|
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
try {
|
try {
|
||||||
|
@ -269,10 +192,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
|
|
||||||
this.candidateGenerators = customCandidateGenerators;
|
|
||||||
}
|
|
||||||
|
|
||||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
allowedOnPath = ".*/src/test/.*")
|
allowedOnPath = ".*/src/test/.*")
|
||||||
List<CandidateGenerator> getCandidateGenerators() {
|
List<CandidateGenerator> getCandidateGenerators() {
|
||||||
|
@ -280,13 +199,81 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setSlop(Configuration conf) {
|
protected float getDefaultSlop() {
|
||||||
this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
|
return 0.001f;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<CandidateGenerator> createCandidateGenerators() {
|
||||||
|
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
|
||||||
|
candidateGenerators.add(new RandomCandidateGenerator());
|
||||||
|
candidateGenerators.add(new LoadCandidateGenerator());
|
||||||
|
candidateGenerators.add(localityCandidateGenerator);
|
||||||
|
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
|
||||||
|
return candidateGenerators;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setClusterMetrics(ClusterMetrics st) {
|
protected void loadConf(Configuration conf) {
|
||||||
super.setClusterMetrics(st);
|
super.loadConf(conf);
|
||||||
|
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
|
||||||
|
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
|
||||||
|
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
|
||||||
|
runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
|
||||||
|
|
||||||
|
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||||
|
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
||||||
|
if (localityCandidateGenerator == null) {
|
||||||
|
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
|
||||||
|
}
|
||||||
|
localityCost = new ServerLocalityCostFunction(conf);
|
||||||
|
rackLocalityCost = new RackLocalityCostFunction(conf);
|
||||||
|
|
||||||
|
this.candidateGenerators = createCandidateGenerators();
|
||||||
|
|
||||||
|
regionLoadFunctions = new CostFromRegionLoadFunction[] { new ReadRequestCostFunction(conf),
|
||||||
|
new CPRequestCostFunction(conf), new WriteRequestCostFunction(conf),
|
||||||
|
new MemStoreSizeCostFunction(conf), new StoreFileCostFunction(conf) };
|
||||||
|
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
|
||||||
|
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
|
||||||
|
|
||||||
|
costFunctions = new ArrayList<>();
|
||||||
|
addCostFunction(new RegionCountSkewCostFunction(conf));
|
||||||
|
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
|
||||||
|
addCostFunction(new MoveCostFunction(conf));
|
||||||
|
addCostFunction(localityCost);
|
||||||
|
addCostFunction(rackLocalityCost);
|
||||||
|
addCostFunction(new TableSkewCostFunction(conf));
|
||||||
|
addCostFunction(regionReplicaHostCostFunction);
|
||||||
|
addCostFunction(regionReplicaRackCostFunction);
|
||||||
|
addCostFunction(regionLoadFunctions[0]);
|
||||||
|
addCostFunction(regionLoadFunctions[1]);
|
||||||
|
addCostFunction(regionLoadFunctions[2]);
|
||||||
|
addCostFunction(regionLoadFunctions[3]);
|
||||||
|
addCostFunction(regionLoadFunctions[4]);
|
||||||
|
loadCustomCostFunctions(conf);
|
||||||
|
|
||||||
|
curFunctionCosts = new double[costFunctions.size()];
|
||||||
|
tempFunctionCosts = new double[costFunctions.size()];
|
||||||
|
|
||||||
|
isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
|
||||||
|
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
|
||||||
|
isBalancerRejectionRecording =
|
||||||
|
conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
|
||||||
|
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
|
||||||
|
|
||||||
|
if (this.namedQueueRecorder == null &&
|
||||||
|
(isBalancerDecisionRecording || isBalancerRejectionRecording)) {
|
||||||
|
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
|
||||||
|
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
|
||||||
|
Arrays.toString(getCostFunctionNames()) + " etc.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateClusterMetrics(ClusterMetrics st) {
|
||||||
|
super.updateClusterMetrics(st);
|
||||||
updateRegionLoad();
|
updateRegionLoad();
|
||||||
for (CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
for (CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
||||||
cost.setClusterMetrics(st);
|
cost.setClusterMetrics(st);
|
||||||
|
@ -315,8 +302,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
|
||||||
protected synchronized boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
|
|
||||||
regionReplicaHostCostFunction.init(c);
|
regionReplicaHostCostFunction.init(c);
|
||||||
if (regionReplicaHostCostFunction.cost() > 0) {
|
if (regionReplicaHostCostFunction.cost() > 0) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -328,8 +314,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
protected boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
|
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||||
|
boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
|
||||||
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
|
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
|
||||||
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -400,6 +387,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
.generate(cluster);
|
.generate(cluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
|
allowedOnPath = ".*/src/test/.*")
|
||||||
|
void setRackManager(RackManager rackManager) {
|
||||||
|
this.rackManager = rackManager;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the cluster state this will try and approach an optimal balance. This
|
* Given the cluster state this will try and approach an optimal balance. This
|
||||||
* should always approach the optimal state given enough steps.
|
* should always approach the optimal state given enough steps.
|
||||||
|
@ -649,7 +642,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
/**
|
/**
|
||||||
* Store the current region loads.
|
* Store the current region loads.
|
||||||
*/
|
*/
|
||||||
private synchronized void updateRegionLoad() {
|
private void updateRegionLoad() {
|
||||||
// We create a new hashmap so that regions that are no longer there are removed.
|
// We create a new hashmap so that regions that are no longer there are removed.
|
||||||
// However we temporarily need the old loads so we can use them to keep the rolling average.
|
// However we temporarily need the old loads so we can use them to keep the rolling average.
|
||||||
Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
|
Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
|
||||||
|
|
|
@ -73,7 +73,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
|
||||||
|
|
||||||
private ClusterMetrics clusterStatus;
|
|
||||||
private MasterServices masterServices;
|
private MasterServices masterServices;
|
||||||
private FavoredNodesManager favoredNodesManager;
|
private FavoredNodesManager favoredNodesManager;
|
||||||
private volatile RSGroupInfoManager rsGroupInfoManager;
|
private volatile RSGroupInfoManager rsGroupInfoManager;
|
||||||
|
@ -96,12 +95,11 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public RSGroupBasedLoadBalancer() {}
|
public RSGroupBasedLoadBalancer() {}
|
||||||
|
|
||||||
|
// must be called after calling initialize
|
||||||
@Override
|
@Override
|
||||||
public void setClusterMetrics(ClusterMetrics sm) {
|
public synchronized void updateClusterMetrics(ClusterMetrics sm) {
|
||||||
this.clusterStatus = sm;
|
assert internalBalancer != null;
|
||||||
if (internalBalancer != null) {
|
internalBalancer.updateClusterMetrics(sm);
|
||||||
internalBalancer.setClusterMetrics(sm);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMasterServices(MasterServices masterServices) {
|
public void setMasterServices(MasterServices masterServices) {
|
||||||
|
@ -112,7 +110,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
* Balance by RSGroup.
|
* Balance by RSGroup.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<RegionPlan> balanceCluster(
|
public synchronized List<RegionPlan> balanceCluster(
|
||||||
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
|
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
|
||||||
if (!isOnline()) {
|
if (!isOnline()) {
|
||||||
throw new ConstraintException(
|
throw new ConstraintException(
|
||||||
|
@ -335,6 +333,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
// Create the balancer
|
// Create the balancer
|
||||||
Configuration conf = masterServices.getConfiguration();
|
Configuration conf = masterServices.getConfiguration();
|
||||||
Class<? extends LoadBalancer> balancerClass;
|
Class<? extends LoadBalancer> balancerClass;
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS);
|
String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS);
|
||||||
if (balancerClassName == null) {
|
if (balancerClassName == null) {
|
||||||
balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
||||||
|
@ -352,9 +351,6 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
internalBalancer = ReflectionUtils.newInstance(balancerClass);
|
internalBalancer = ReflectionUtils.newInstance(balancerClass);
|
||||||
internalBalancer.setClusterInfoProvider(new MasterClusterInfoProvider(masterServices));
|
internalBalancer.setClusterInfoProvider(new MasterClusterInfoProvider(masterServices));
|
||||||
if(clusterStatus != null) {
|
|
||||||
internalBalancer.setClusterMetrics(clusterStatus);
|
|
||||||
}
|
|
||||||
// special handling for favor node balancers
|
// special handling for favor node balancers
|
||||||
if (internalBalancer instanceof FavoredNodesPromoter) {
|
if (internalBalancer instanceof FavoredNodesPromoter) {
|
||||||
favoredNodesManager = new FavoredNodesManager(masterServices);
|
favoredNodesManager = new FavoredNodesManager(masterServices);
|
||||||
|
@ -365,8 +361,6 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
((FavoredStochasticBalancer) internalBalancer).setMasterServices(masterServices);
|
((FavoredStochasticBalancer) internalBalancer).setMasterServices(masterServices);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
internalBalancer.initialize();
|
internalBalancer.initialize();
|
||||||
// init fallback groups
|
// init fallback groups
|
||||||
this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
|
this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
|
||||||
|
@ -393,7 +387,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public synchronized void onConfigurationChange(Configuration conf) {
|
||||||
boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
|
boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
|
||||||
if (fallbackEnabled != newFallbackEnabled) {
|
if (fallbackEnabled != newFallbackEnabled) {
|
||||||
LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY,
|
LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY,
|
||||||
|
@ -405,15 +399,12 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(String why) {
|
public void stop(String why) {
|
||||||
|
internalBalancer.stop(why);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStopped() {
|
public boolean isStopped() {
|
||||||
return false;
|
return internalBalancer.isStopped();
|
||||||
}
|
|
||||||
|
|
||||||
public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
|
|
||||||
this.rsGroupInfoManager = rsGroupInfoManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public LoadBalancer getInternalBalancer() {
|
public LoadBalancer getInternalBalancer() {
|
||||||
|
@ -425,7 +416,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postMasterStartupInitialize() {
|
public synchronized void postMasterStartupInitialize() {
|
||||||
this.internalBalancer.postMasterStartupInitialize();
|
this.internalBalancer.postMasterStartupInitialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,19 +17,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.balancer;
|
package org.apache.hadoop.hbase.master.balancer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for FavoredNode unit tests
|
* Used for FavoredNode unit tests
|
||||||
*/
|
*/
|
||||||
public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer {
|
public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configureGenerators() {
|
protected List<CandidateGenerator> createCandidateGenerators() {
|
||||||
List<CandidateGenerator> fnPickers = Lists.newArrayList();
|
List<CandidateGenerator> fnPickers = new ArrayList<>(1);
|
||||||
fnPickers.add(new FavoredNodeLoadPicker());
|
fnPickers.add(new FavoredNodeLoadPicker());
|
||||||
setCandidateGenerators(fnPickers);
|
return fnPickers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -410,6 +410,8 @@ public class RSGroupableBalancerTestBase extends BalancerTestBase {
|
||||||
AssignmentManager am = Mockito.mock(AssignmentManager.class);
|
AssignmentManager am = Mockito.mock(AssignmentManager.class);
|
||||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||||
Mockito.when(services.getConfiguration()).thenReturn(conf);
|
Mockito.when(services.getConfiguration()).thenReturn(conf);
|
||||||
|
RSGroupInfoManager manager = getMockedGroupInfoManager();
|
||||||
|
Mockito.when(services.getRSGroupInfoManager()).thenReturn(manager);
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,8 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
|
||||||
conf.setFloat("hbase.regions.slop", 0.0f);
|
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||||
loadBalancer = new StochasticLoadBalancer();
|
loadBalancer = new StochasticLoadBalancer();
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||||
|
loadBalancer.initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer,
|
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer,
|
||||||
|
|
|
@ -29,12 +29,12 @@ public class StochasticBalancerTestBase2 extends StochasticBalancerTestBase {
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void after() {
|
public void after() {
|
||||||
// reset config to make sure balancer run
|
// reset config to make sure balancer run
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master.balancer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -56,7 +55,7 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testBalancerDecisions() {
|
public void testBalancerDecisions() {
|
||||||
conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
|
conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
|
||||||
try {
|
try {
|
||||||
|
@ -64,7 +63,7 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
|
||||||
boolean[] perTableBalancerConfigs = {true, false};
|
boolean[] perTableBalancerConfigs = {true, false};
|
||||||
for (boolean isByTable : perTableBalancerConfigs) {
|
for (boolean isByTable : perTableBalancerConfigs) {
|
||||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
for (int[] mockCluster : clusterStateMocks) {
|
for (int[] mockCluster : clusterStateMocks) {
|
||||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||||
|
@ -93,7 +92,7 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
|
||||||
// reset config
|
// reset config
|
||||||
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,5 +100,4 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
|
||||||
return (Arrays.stream(cluster).anyMatch(x -> x > 1)) && (Arrays.stream(cluster)
|
return (Arrays.stream(cluster).anyMatch(x -> x > 1)) && (Arrays.stream(cluster)
|
||||||
.anyMatch(x -> x < 1));
|
.anyMatch(x -> x < 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
|
||||||
//enabled balancer rejection recording
|
//enabled balancer rejection recording
|
||||||
conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true);
|
conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true);
|
||||||
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName());
|
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName());
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
//Simulate 2 servers with 5 regions.
|
//Simulate 2 servers with 5 regions.
|
||||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(new int[] { 5, 5 });
|
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(new int[] { 5, 5 });
|
||||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
|
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
|
||||||
|
@ -95,7 +95,7 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
|
||||||
//Reject case 2: Cost < minCostNeedBalance
|
//Reject case 2: Cost < minCostNeedBalance
|
||||||
MockCostFunction.mockCost = 1;
|
MockCostFunction.mockCost = 1;
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", Float.MAX_VALUE);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", Float.MAX_VALUE);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
|
Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
|
||||||
|
|
||||||
//NamedQueue is an async Producer-consumer Pattern, waiting here until it completed
|
//NamedQueue is an async Producer-consumer Pattern, waiting here until it completed
|
||||||
|
@ -113,7 +113,7 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
|
||||||
}finally {
|
}finally {
|
||||||
conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
|
conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
|
||||||
conf.unset(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED);
|
conf.unset(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.net.Address;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -68,8 +69,8 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
|
||||||
tableDescs = constructTableDesc(true);
|
tableDescs = constructTableDesc(true);
|
||||||
conf.set("hbase.regions.slop", "0");
|
conf.set("hbase.regions.slop", "0");
|
||||||
conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
|
conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
|
||||||
|
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
||||||
loadBalancer = new RSGroupBasedLoadBalancer();
|
loadBalancer = new RSGroupBasedLoadBalancer();
|
||||||
loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager());
|
|
||||||
loadBalancer.setMasterServices(getMockedMaster());
|
loadBalancer.setMasterServices(getMockedMaster());
|
||||||
loadBalancer.initialize();
|
loadBalancer.initialize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,12 @@ import org.apache.hadoop.hbase.Size;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.balancer.BalancerTestBase.MockMapping;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -68,8 +70,8 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.readRequestCost", 10000f);
|
conf.setFloat("hbase.master.balancer.stochastic.readRequestCost", 10000f);
|
||||||
conf.set("hbase.rsgroup.grouploadbalancer.class",
|
conf.set("hbase.rsgroup.grouploadbalancer.class",
|
||||||
StochasticLoadBalancer.class.getCanonicalName());
|
StochasticLoadBalancer.class.getCanonicalName());
|
||||||
|
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
||||||
loadBalancer = new RSGroupBasedLoadBalancer();
|
loadBalancer = new RSGroupBasedLoadBalancer();
|
||||||
loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager());
|
|
||||||
loadBalancer.setMasterServices(getMockedMaster());
|
loadBalancer.setMasterServices(getMockedMaster());
|
||||||
loadBalancer.initialize();
|
loadBalancer.initialize();
|
||||||
}
|
}
|
||||||
|
@ -114,7 +116,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
||||||
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
|
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
|
||||||
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
|
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
|
||||||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||||
loadBalancer.setClusterMetrics(clusterStatus);
|
loadBalancer.updateClusterMetrics(clusterStatus);
|
||||||
|
|
||||||
// ReadRequestCostFunction are Rate based, So doing setClusterMetrics again
|
// ReadRequestCostFunction are Rate based, So doing setClusterMetrics again
|
||||||
// this time, regions on serverA with more readRequestCount load
|
// this time, regions on serverA with more readRequestCount load
|
||||||
|
@ -129,7 +131,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
||||||
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
|
serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0));
|
||||||
clusterStatus = mock(ClusterMetrics.class);
|
clusterStatus = mock(ClusterMetrics.class);
|
||||||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||||
loadBalancer.setClusterMetrics(clusterStatus);
|
loadBalancer.updateClusterMetrics(clusterStatus);
|
||||||
|
|
||||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||||
(Map) mockClusterServersWithTables(clusterState);
|
(Map) mockClusterServersWithTables(clusterState);
|
||||||
|
|
|
@ -133,7 +133,8 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
||||||
loadBalancer = new StochasticLoadBalancer();
|
loadBalancer = new StochasticLoadBalancer();
|
||||||
|
|
||||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
|
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||||
|
loadBalancer.initialize();
|
||||||
|
|
||||||
TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
|
TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
|
||||||
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
|
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
|
||||||
|
@ -162,7 +163,8 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
||||||
loadBalancer = new StochasticLoadBalancer();
|
loadBalancer = new StochasticLoadBalancer();
|
||||||
|
|
||||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, true);
|
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, true);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||||
|
loadBalancer.initialize();
|
||||||
|
|
||||||
// NOTE the size is normally set in setClusterMetrics, for test purpose, we set it manually
|
// NOTE the size is normally set in setClusterMetrics, for test purpose, we set it manually
|
||||||
// Tables: hbase:namespace, table1, table2
|
// Tables: hbase:namespace, table1, table2
|
||||||
|
|
|
@ -144,7 +144,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
public void testCPRequestCost() {
|
public void testCPRequestCost() {
|
||||||
// in order to pass needsBalance judgement
|
// in order to pass needsBalance judgement
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 10000f);
|
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 10000f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
// mock cluster State
|
// mock cluster State
|
||||||
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<ServerName, List<RegionInfo>>();
|
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<ServerName, List<RegionInfo>>();
|
||||||
ServerName serverA = randomServer(3).getServerName();
|
ServerName serverA = randomServer(3).getServerName();
|
||||||
|
@ -163,7 +163,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
|
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
|
||||||
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
|
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
|
||||||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||||
loadBalancer.setClusterMetrics(clusterStatus);
|
loadBalancer.updateClusterMetrics(clusterStatus);
|
||||||
|
|
||||||
// CPRequestCostFunction are Rate based, So doing setClusterMetrics again
|
// CPRequestCostFunction are Rate based, So doing setClusterMetrics again
|
||||||
// this time, regions on serverA with more cpRequestCount load
|
// this time, regions on serverA with more cpRequestCount load
|
||||||
|
@ -177,7 +177,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
|
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
|
||||||
clusterStatus = mock(ClusterMetrics.class);
|
clusterStatus = mock(ClusterMetrics.class);
|
||||||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||||
loadBalancer.setClusterMetrics(clusterStatus);
|
loadBalancer.updateClusterMetrics(clusterStatus);
|
||||||
|
|
||||||
List<RegionPlan> plans =
|
List<RegionPlan> plans =
|
||||||
loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, clusterState);
|
loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, clusterState);
|
||||||
|
@ -196,7 +196,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
assertNull(loadBalancer.namedQueueRecorder);
|
assertNull(loadBalancer.namedQueueRecorder);
|
||||||
// reset config
|
// reset config
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 5f);
|
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 5f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -223,7 +223,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||||
// when(clusterStatus.getLoad(sn)).thenReturn(sl);
|
// when(clusterStatus.getLoad(sn)).thenReturn(sl);
|
||||||
|
|
||||||
loadBalancer.setClusterMetrics(clusterStatus);
|
loadBalancer.updateClusterMetrics(clusterStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
|
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
|
||||||
|
@ -249,7 +249,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
boolean[] perTableBalancerConfigs = {true, false};
|
boolean[] perTableBalancerConfigs = {true, false};
|
||||||
for (boolean isByTable : perTableBalancerConfigs) {
|
for (boolean isByTable : perTableBalancerConfigs) {
|
||||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
for (int[] mockCluster : clusterStateMocks) {
|
for (int[] mockCluster : clusterStateMocks) {
|
||||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||||
|
@ -263,7 +263,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
// reset config
|
// reset config
|
||||||
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,7 +378,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testCostAfterUndoAction() {
|
public void testCostAfterUndoAction() {
|
||||||
final int runs = 10;
|
final int runs = 10;
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
for (int[] mockCluster : clusterStateMocks) {
|
for (int[] mockCluster : clusterStateMocks) {
|
||||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||||
loadBalancer.initCosts(cluster);
|
loadBalancer.initCosts(cluster);
|
||||||
|
@ -505,13 +505,13 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||||
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
|
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
|
||||||
DummyCostFunction.class.getName());
|
DummyCostFunction.class.getName());
|
||||||
|
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
assertTrue(Arrays.
|
assertTrue(Arrays.
|
||||||
asList(loadBalancer.getCostFunctionNames()).
|
asList(loadBalancer.getCostFunctionNames()).
|
||||||
contains(DummyCostFunction.class.getSimpleName()));
|
contains(DummyCostFunction.class.getSimpleName()));
|
||||||
} finally {
|
} finally {
|
||||||
conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
|
conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
for (int[] mockCluster : clusterStateMocks) {
|
for (int[] mockCluster : clusterStateMocks) {
|
||||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||||
List<ServerAndLoad> list = convertToList(servers);
|
List<ServerAndLoad> list = convertToList(servers);
|
||||||
|
|
|
@ -75,7 +75,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalan
|
||||||
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
|
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
|
||||||
RULES_FILE);
|
RULES_FILE);
|
||||||
loadBalancer = new StochasticLoadBalancer();
|
loadBalancer = new StochasticLoadBalancer();
|
||||||
loadBalancer.setConf(BalancerTestBase.conf);
|
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||||
|
loadBalancer.initialize();
|
||||||
loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
|
loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class TestStochasticLoadBalancerRegionReplicaHighReplication
|
||||||
public void testRegionReplicasOnMidClusterHighReplication() {
|
public void testRegionReplicasOnMidClusterHighReplication() {
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
int numNodes = 40;
|
int numNodes = 40;
|
||||||
int numRegions = 6 * numNodes;
|
int numRegions = 6 * numNodes;
|
||||||
int replication = 40; // 40 replicas per region, one for each server
|
int replication = 40; // 40 replicas per region, one for each server
|
||||||
|
|
|
@ -35,7 +35,7 @@ public class TestStochasticLoadBalancerRegionReplicaReplicationGreaterThanNumNod
|
||||||
@Test
|
@Test
|
||||||
public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
|
public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
int numNodes = 40;
|
int numNodes = 40;
|
||||||
int numRegions = 6 * 50;
|
int numRegions = 6 * 50;
|
||||||
int replication = 50; // 50 replicas per region, more than numNodes
|
int replication = 50; // 50 replicas per region, more than numNodes
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class TestStochasticLoadBalancerRegionReplicaSameHosts extends Stochastic
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
int numHosts = 30;
|
int numHosts = 30;
|
||||||
int numRegions = 30 * 30;
|
int numRegions = 30 * 30;
|
||||||
int replication = 3; // 3 replicas per region
|
int replication = 3; // 3 replicas per region
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.onConfigurationChange(conf);
|
||||||
int numNodes = 30;
|
int numNodes = 30;
|
||||||
int numRegions = numNodes * 30;
|
int numRegions = numNodes * 30;
|
||||||
int replication = 3; // 3 replicas per region
|
int replication = 3; // 3 replicas per region
|
||||||
|
|
Loading…
Reference in New Issue