HBASE-13965 Stochastic Load Balancer JMX Metrics (Lei Chen)
This commit is contained in:
parent
20d1fa36e7
commit
598cfeb775
@ -181,6 +181,10 @@ public abstract class Filter {
|
|||||||
* Seek to next key which is given as hint by the filter.
|
* Seek to next key which is given as hint by the filter.
|
||||||
*/
|
*/
|
||||||
SEEK_NEXT_USING_HINT,
|
SEEK_NEXT_USING_HINT,
|
||||||
|
/**
|
||||||
|
* Include KeyValue and done with row, seek to next.
|
||||||
|
*/
|
||||||
|
INCLUDE_AND_SEEK_NEXT_ROW,
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -134,7 +134,8 @@ final public class FilterWrapper extends Filter {
|
|||||||
public enum FilterRowRetCode {
|
public enum FilterRowRetCode {
|
||||||
NOT_CALLED,
|
NOT_CALLED,
|
||||||
INCLUDE, // corresponds to filter.filterRow() returning false
|
INCLUDE, // corresponds to filter.filterRow() returning false
|
||||||
EXCLUDE // corresponds to filter.filterRow() returning true
|
EXCLUDE, // corresponds to filter.filterRow() returning true
|
||||||
|
INCLUDE_THIS_FAMILY // exclude other families
|
||||||
}
|
}
|
||||||
public FilterRowRetCode filterRowCellsWithRet(List<Cell> kvs) throws IOException {
|
public FilterRowRetCode filterRowCellsWithRet(List<Cell> kvs) throws IOException {
|
||||||
//To fix HBASE-6429,
|
//To fix HBASE-6429,
|
||||||
|
@ -124,6 +124,12 @@ public final class HConstants {
|
|||||||
/** Config for pluggable load balancers */
|
/** Config for pluggable load balancers */
|
||||||
public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class";
|
public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class";
|
||||||
|
|
||||||
|
/** Config for balancing the cluster by table */
|
||||||
|
public static final String HBASE_MASTER_LOADBALANCE_BYTABLE = "hbase.master.loadbalance.bytable";
|
||||||
|
|
||||||
|
/** The name of the ensemble table */
|
||||||
|
public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";
|
||||||
|
|
||||||
/** Config for pluggable region normalizer */
|
/** Config for pluggable region normalizer */
|
||||||
public static final String HBASE_MASTER_NORMALIZER_CLASS =
|
public static final String HBASE_MASTER_NORMALIZER_CLASS =
|
||||||
"hbase.master.normalizer.class";
|
"hbase.master.normalizer.class";
|
||||||
|
@ -34,6 +34,7 @@ import java.util.HashSet;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -1256,12 +1257,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||||||
this.assignmentManager.getRegionStates().getAssignmentsByTable();
|
this.assignmentManager.getRegionStates().getAssignmentsByTable();
|
||||||
|
|
||||||
List<RegionPlan> plans = new ArrayList<RegionPlan>();
|
List<RegionPlan> plans = new ArrayList<RegionPlan>();
|
||||||
|
|
||||||
//Give the balancer the current cluster state.
|
//Give the balancer the current cluster state.
|
||||||
this.balancer.setClusterStatus(getClusterStatus());
|
this.balancer.setClusterStatus(getClusterStatus());
|
||||||
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
|
for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
|
||||||
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
|
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
|
||||||
if (partialPlans != null) plans.addAll(partialPlans);
|
if (partialPlans != null) plans.addAll(partialPlans);
|
||||||
}
|
}
|
||||||
|
|
||||||
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
|
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
|
||||||
int rpCount = 0; // number of RegionPlans balanced so far
|
int rpCount = 0; // number of RegionPlans balanced so far
|
||||||
long totalRegPlanExecTime = 0;
|
long totalRegPlanExecTime = 0;
|
||||||
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
|
|||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes decisions about the placement and movement of Regions across
|
* Makes decisions about the placement and movement of Regions across
|
||||||
@ -64,6 +65,15 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
|
|||||||
*/
|
*/
|
||||||
void setMasterServices(MasterServices masterServices);
|
void setMasterServices(MasterServices masterServices);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform the major balance operation
|
||||||
|
* @param tableName
|
||||||
|
* @param clusterState
|
||||||
|
* @return List of plans
|
||||||
|
*/
|
||||||
|
List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
|
||||||
|
List<HRegionInfo>> clusterState) throws HBaseIOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform the major balance operation
|
* Perform the major balance operation
|
||||||
* @param clusterState
|
* @param clusterState
|
||||||
|
@ -953,13 +953,14 @@ public class RegionStates {
|
|||||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
|
Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
|
||||||
new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
|
new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) {
|
if (!server.getConfiguration().getBoolean(
|
||||||
|
HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) {
|
||||||
Map<ServerName, List<HRegionInfo>> svrToRegions =
|
Map<ServerName, List<HRegionInfo>> svrToRegions =
|
||||||
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
|
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
|
||||||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||||
svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
|
svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
|
||||||
}
|
}
|
||||||
result.put(TableName.valueOf("ensemble"), svrToRegions);
|
result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions);
|
||||||
} else {
|
} else {
|
||||||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||||
for (HRegionInfo hri: e.getValue()) {
|
for (HRegionInfo hri: e.getValue()) {
|
||||||
|
@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.RegionLoad;
|
|||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
|
||||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RackManager;
|
import org.apache.hadoop.hbase.master.RackManager;
|
||||||
@ -81,6 +80,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The constructor that uses the basic MetricsBalancer
|
||||||
|
*/
|
||||||
|
protected BaseLoadBalancer() {
|
||||||
|
metricsBalancer = new MetricsBalancer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Constructor accepts an instance of MetricsBalancer,
|
||||||
|
* which will be used instead of creating a new one
|
||||||
|
*/
|
||||||
|
protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
|
||||||
|
this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An efficient array based implementation similar to ClusterState for keeping
|
* An efficient array based implementation similar to ClusterState for keeping
|
||||||
* the status of the cluster in terms of region assignment and distribution.
|
* the status of the cluster in terms of region assignment and distribution.
|
||||||
@ -803,7 +817,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||||||
"hbase.balancer.tablesOnMaster";
|
"hbase.balancer.tablesOnMaster";
|
||||||
|
|
||||||
protected final Set<String> tablesOnMaster = new HashSet<String>();
|
protected final Set<String> tablesOnMaster = new HashSet<String>();
|
||||||
protected final MetricsBalancer metricsBalancer = new MetricsBalancer();
|
protected MetricsBalancer metricsBalancer = null;
|
||||||
protected ClusterStatus clusterStatus = null;
|
protected ClusterStatus clusterStatus = null;
|
||||||
protected ServerName masterServerName;
|
protected ServerName masterServerName;
|
||||||
protected MasterServices services;
|
protected MasterServices services;
|
||||||
|
@ -28,11 +28,13 @@ import org.apache.commons.logging.Log;
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
import org.apache.hadoop.hbase.ServerLoad;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.RackManager;
|
import org.apache.hadoop.hbase.master.RackManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
@ -345,4 +347,10 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
|
|||||||
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion);
|
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||||
|
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
|
||||||
|
return balanceCluster(clusterState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,17 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
|||||||
*/
|
*/
|
||||||
public class MetricsBalancer {
|
public class MetricsBalancer {
|
||||||
|
|
||||||
private final MetricsBalancerSource source;
|
private MetricsBalancerSource source = null;
|
||||||
|
|
||||||
public MetricsBalancer() {
|
public MetricsBalancer() {
|
||||||
|
initSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A function to instantiate the metrics source. This function can be overridden in its
|
||||||
|
* subclasses to provide extended sources
|
||||||
|
*/
|
||||||
|
protected void initSource() {
|
||||||
source = CompatibilitySingletonFactory.getInstance(MetricsBalancerSource.class);
|
source = CompatibilitySingletonFactory.getInstance(MetricsBalancerSource.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,9 +30,11 @@ import java.util.TreeMap;
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
|
||||||
import com.google.common.collect.MinMaxPriorityQueue;
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
@ -433,4 +435,10 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||||||
rp.setDestination(sn);
|
rp.setDestination(sn);
|
||||||
regionsToReturn.add(rp);
|
regionsToReturn.add(rp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||||
|
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
|
||||||
|
return balanceCluster(clusterState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,10 +34,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.RegionLoad;
|
import org.apache.hadoop.hbase.RegionLoad;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
import org.apache.hadoop.hbase.ServerLoad;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||||
@ -102,6 +105,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
"hbase.master.balancer.stochastic.maxRunningTime";
|
"hbase.master.balancer.stochastic.maxRunningTime";
|
||||||
protected static final String KEEP_REGION_LOADS =
|
protected static final String KEEP_REGION_LOADS =
|
||||||
"hbase.master.balancer.stochastic.numRegionLoadsToRemember";
|
"hbase.master.balancer.stochastic.numRegionLoadsToRemember";
|
||||||
|
private static final String TABLE_FUNCTION_SEP = "_";
|
||||||
|
|
||||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||||
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
||||||
@ -117,12 +121,28 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
private CandidateGenerator[] candidateGenerators;
|
private CandidateGenerator[] candidateGenerators;
|
||||||
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
||||||
private CostFunction[] costFunctions;
|
private CostFunction[] costFunctions;
|
||||||
|
|
||||||
|
// to save and report costs to JMX
|
||||||
|
private Double curOverallCost = 0d;
|
||||||
|
private Double[] tempFunctionCosts;
|
||||||
|
private Double[] curFunctionCosts;
|
||||||
|
|
||||||
// Keep locality based picker and cost function to alert them
|
// Keep locality based picker and cost function to alert them
|
||||||
// when new services are offered
|
// when new services are offered
|
||||||
private LocalityBasedCandidateGenerator localityCandidateGenerator;
|
private LocalityBasedCandidateGenerator localityCandidateGenerator;
|
||||||
private LocalityCostFunction localityCost;
|
private LocalityCostFunction localityCost;
|
||||||
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
|
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
|
||||||
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
|
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
|
||||||
|
private boolean isByTable = false;
|
||||||
|
private TableName tableName = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
|
||||||
|
* default MetricsBalancer
|
||||||
|
*/
|
||||||
|
public StochasticLoadBalancer() {
|
||||||
|
super(new MetricsStochasticBalancer());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
@ -140,6 +160,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
|
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
|
||||||
|
|
||||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||||
|
isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||||
|
|
||||||
if (localityCandidateGenerator == null) {
|
if (localityCandidateGenerator == null) {
|
||||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
||||||
@ -178,6 +199,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
regionLoadFunctions[2],
|
regionLoadFunctions[2],
|
||||||
regionLoadFunctions[3],
|
regionLoadFunctions[3],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
curFunctionCosts= new Double[costFunctions.length];
|
||||||
|
tempFunctionCosts= new Double[costFunctions.length];
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -192,6 +217,26 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
||||||
cost.setClusterStatus(st);
|
cost.setClusterStatus(st);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update metrics size
|
||||||
|
try {
|
||||||
|
// by-table or ensemble mode
|
||||||
|
int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
|
||||||
|
int functionsCount = getCostFunctionNames().length;
|
||||||
|
|
||||||
|
updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("failed to get the size of all tables, exception = " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the number of metrics that are reported to JMX
|
||||||
|
*/
|
||||||
|
public void updateMetricsSize(int size) {
|
||||||
|
if (metricsBalancer instanceof MetricsStochasticBalancer) {
|
||||||
|
((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -211,6 +256,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
|
||||||
|
List<HRegionInfo>> clusterState) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
return balanceCluster(clusterState);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the cluster state this will try and approach an optimal balance. This
|
* Given the cluster state this will try and approach an optimal balance. This
|
||||||
* should always approach the optimal state given enough steps.
|
* should always approach the optimal state given enough steps.
|
||||||
@ -222,6 +274,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
if (plans != null || clusterState == null || clusterState.size() <= 1) {
|
if (plans != null || clusterState == null || clusterState.size() <= 1) {
|
||||||
return plans;
|
return plans;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
|
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
|
||||||
if (clusterState.size() <= 2) {
|
if (clusterState.size() <= 2) {
|
||||||
return null;
|
return null;
|
||||||
@ -243,6 +296,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
//of all the regions in the table(s) (that's true today)
|
//of all the regions in the table(s) (that's true today)
|
||||||
// Keep track of servers to iterate through them.
|
// Keep track of servers to iterate through them.
|
||||||
Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
|
Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
|
||||||
|
|
||||||
if (!needsBalance(cluster)) {
|
if (!needsBalance(cluster)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -252,6 +306,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
initCosts(cluster);
|
initCosts(cluster);
|
||||||
|
|
||||||
double currentCost = computeCost(cluster, Double.MAX_VALUE);
|
double currentCost = computeCost(cluster, Double.MAX_VALUE);
|
||||||
|
curOverallCost = currentCost;
|
||||||
|
for (int i = 0; i < this.curFunctionCosts.length; i++) {
|
||||||
|
curFunctionCosts[i] = tempFunctionCosts[i];
|
||||||
|
}
|
||||||
|
|
||||||
double initCost = currentCost;
|
double initCost = currentCost;
|
||||||
double newCost = currentCost;
|
double newCost = currentCost;
|
||||||
@ -278,6 +336,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
// Should this be kept?
|
// Should this be kept?
|
||||||
if (newCost < currentCost) {
|
if (newCost < currentCost) {
|
||||||
currentCost = newCost;
|
currentCost = newCost;
|
||||||
|
|
||||||
|
// save for JMX
|
||||||
|
curOverallCost = currentCost;
|
||||||
|
for (int i = 0; i < this.curFunctionCosts.length; i++) {
|
||||||
|
curFunctionCosts[i] = tempFunctionCosts[i];
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Put things back the way they were before.
|
// Put things back the way they were before.
|
||||||
// TODO: undo by remembering old values
|
// TODO: undo by remembering old values
|
||||||
@ -296,6 +360,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
|
|
||||||
metricsBalancer.balanceCluster(endTime - startTime);
|
metricsBalancer.balanceCluster(endTime - startTime);
|
||||||
|
|
||||||
|
// update costs metrics
|
||||||
|
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
|
||||||
if (initCost > currentCost) {
|
if (initCost > currentCost) {
|
||||||
plans = createRegionPlans(cluster);
|
plans = createRegionPlans(cluster);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -305,6 +371,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
+ plans.size() + " regions; Going from a computed cost of "
|
+ plans.size() + " regions; Going from a computed cost of "
|
||||||
+ initCost + " to a new cost of " + currentCost);
|
+ initCost + " to a new cost of " + currentCost);
|
||||||
}
|
}
|
||||||
|
|
||||||
return plans;
|
return plans;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -315,6 +382,32 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* update costs to JMX
|
||||||
|
*/
|
||||||
|
private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
|
||||||
|
if (tableName == null) return;
|
||||||
|
|
||||||
|
// check if the metricsBalancer is MetricsStochasticBalancer before casting
|
||||||
|
if (metricsBalancer instanceof MetricsStochasticBalancer) {
|
||||||
|
MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
|
||||||
|
// overall cost
|
||||||
|
balancer.updateStochasticCost(tableName.getNameAsString(),
|
||||||
|
"Overall", "Overall cost", overall);
|
||||||
|
|
||||||
|
// each cost function
|
||||||
|
for (int i = 0; i < costFunctions.length; i++) {
|
||||||
|
CostFunction costFunction = costFunctions[i];
|
||||||
|
String costFunctionName = costFunction.getClass().getSimpleName();
|
||||||
|
Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
|
||||||
|
// TODO: cost function may need a specific description
|
||||||
|
balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
|
||||||
|
"The percent of " + costFunctionName, costPercent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
|
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
|
||||||
* state.
|
* state.
|
||||||
@ -390,6 +483,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the names of the cost functions
|
||||||
|
*/
|
||||||
|
public String[] getCostFunctionNames() {
|
||||||
|
if (costFunctions == null) return null;
|
||||||
|
String[] ret = new String[costFunctions.length];
|
||||||
|
for (int i = 0; i < costFunctions.length; i++) {
|
||||||
|
CostFunction c = costFunctions[i];
|
||||||
|
ret[i] = c.getClass().getSimpleName();
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the main cost function. It will compute a cost associated with a proposed cluster
|
* This is the main cost function. It will compute a cost associated with a proposed cluster
|
||||||
* state. All different costs will be combined with their multipliers to produce a double cost.
|
* state. All different costs will be combined with their multipliers to produce a double cost.
|
||||||
@ -402,17 +509,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
protected double computeCost(Cluster cluster, double previousCost) {
|
protected double computeCost(Cluster cluster, double previousCost) {
|
||||||
double total = 0;
|
double total = 0;
|
||||||
|
|
||||||
for (CostFunction c:costFunctions) {
|
for (int i = 0; i < costFunctions.length; i++) {
|
||||||
|
CostFunction c = costFunctions[i];
|
||||||
|
this.tempFunctionCosts[i] = 0.0;
|
||||||
|
|
||||||
if (c.getMultiplier() <= 0) {
|
if (c.getMultiplier() <= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
total += c.getMultiplier() * c.cost();
|
Float multiplier = c.getMultiplier();
|
||||||
|
Double cost = c.cost();
|
||||||
|
|
||||||
|
this.tempFunctionCosts[i] = multiplier*cost;
|
||||||
|
total += this.tempFunctionCosts[i];
|
||||||
|
|
||||||
if (total > previousCost) {
|
if (total > previousCost) {
|
||||||
return total;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1318,7 +1433,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
|
|
||||||
public RegionReplicaRackCostFunction(Configuration conf) {
|
public RegionReplicaRackCostFunction(Configuration conf) {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
|
this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
|
||||||
|
DEFAULT_REGION_REPLICA_RACK_COST_KEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1390,4 +1506,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||||||
return rl.getStorefileSizeMB();
|
return rl.getStorefileSizeMB();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper function to compose the attribute name from tablename and costfunction name
|
||||||
|
*/
|
||||||
|
public static String composeAttributeName(String tableName, String costFunctionName) {
|
||||||
|
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -444,7 +444,17 @@ public class ScanQueryMatcher {
|
|||||||
colChecker = columns.checkVersions(cell, timestamp, typeByte,
|
colChecker = columns.checkVersions(cell, timestamp, typeByte,
|
||||||
mvccVersion > maxReadPointToTrackVersions);
|
mvccVersion > maxReadPointToTrackVersions);
|
||||||
//Optimize with stickyNextRow
|
//Optimize with stickyNextRow
|
||||||
stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow;
|
boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW &&
|
||||||
|
filter.isFamilyEssential(cell.getFamilyArray());
|
||||||
|
if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
|
||||||
|
stickyNextRow = true;
|
||||||
|
}
|
||||||
|
if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
||||||
|
if (colChecker != MatchCode.SKIP) {
|
||||||
|
return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
|
||||||
|
}
|
||||||
|
return MatchCode.SEEK_NEXT_ROW;
|
||||||
|
}
|
||||||
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
|
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
|
||||||
colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
|
colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
|
||||||
: colChecker;
|
: colChecker;
|
||||||
|
@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
@ -108,6 +109,12 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||||
|
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
x
Reference in New Issue
Block a user