HBASE-25793 Move BaseLoadBalancer.Cluster to a separated file (#3185) (#3190)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-04-23 00:08:16 +08:00 committed by GitHub
parent f4f84302fa
commit a02ce95ff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1237 additions and 1064 deletions

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class AssignRegionAction extends BalanceAction {
private final int region;
private final int server;
public AssignRegionAction(int region, int server) {
super(Type.ASSIGN_REGION);
this.region = region;
this.server = server;
}
public int getRegion() {
return region;
}
public int getServer() {
return server;
}
@Override
public BalanceAction undoAction() {
// TODO implement this. This action is not being used by the StochasticLB for now
// in case it uses it, we should implement this function.
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
}
@Override
public String toString() {
return getType() + ": " + region + ":" + server;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An action to move or swap a region
*/
@InterfaceAudience.Private
abstract class BalanceAction {
enum Type {
ASSIGN_REGION, MOVE_REGION, SWAP_REGIONS, NULL,
}
static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
};
private final Type type;
BalanceAction(Type type) {
this.type = type;
}
/**
* Returns an Action which would undo this action
*/
BalanceAction undoAction() {
return this;
}
Type getType() {
return type;
}
@Override
public String toString() {
return type + ":";
}
}

View File

@ -0,0 +1,865 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An efficient array based implementation similar to ClusterState for keeping the status of the
* cluster in terms of region assignment and distribution. LoadBalancers, such as
* StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
* manipulations are very costly, which is why this class uses mostly indexes and arrays.
* <p/>
* BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
* topology in terms of server names, hostnames and racks.
*/
@InterfaceAudience.Private
class BalancerClusterState {
private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
ServerName[] servers;
// ServerName uniquely identifies a region server. multiple RS can run on the same host
String[] hosts;
String[] racks;
boolean multiServersPerHost = false; // whether or not any host has more than one server
ArrayList<String> tables;
RegionInfo[] regions;
Deque<BalancerRegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder;
int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
int[] serverIndexToHostIndex; // serverIndex -> host index
int[] serverIndexToRackIndex; // serverIndex -> rack index
int[][] regionsPerServer; // serverIndex -> region list
int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
int[][] regionsPerHost; // hostIndex -> list of regions
int[][] regionsPerRack; // rackIndex -> region list
int[][] primariesOfRegionsPerServer; // serverIndex -> sorted list of regions by primary region
// index
int[][] primariesOfRegionsPerHost; // hostIndex -> sorted list of regions by primary region index
int[][] primariesOfRegionsPerRack; // rackIndex -> sorted list of regions by primary region index
int[][] serversPerHost; // hostIndex -> list of server indexes
int[][] serversPerRack; // rackIndex -> list of server indexes
int[] regionIndexToServerIndex; // regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; // whether there is regions with replicas
Integer[] serverIndicesSortedByRegionCount;
Integer[] serverIndicesSortedByLocality;
Map<Address, Integer> serversToIndex;
Map<String, Integer> hostsToIndex;
Map<String, Integer> racksToIndex;
Map<String, Integer> tablesToIndex;
Map<RegionInfo, Integer> regionsToIndex;
float[] localityPerServer;
int numServers;
int numHosts;
int numRacks;
int numTables;
int numRegions;
int numMovedRegions = 0; // num moved regions from the initial configuration
Map<ServerName, List<RegionInfo>> clusterState;
private final RackManager rackManager;
// Maps region -> rackIndex -> locality of region on rack
private float[][] rackLocalities;
// Maps localityType -> region -> [server|rack]Index with highest locality
private int[][] regionsToMostLocalEntities;
static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
return UNKNOWN_RACK;
}
}
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager);
}
@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RackManager rackManager) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
serversToIndex = new HashMap<>();
hostsToIndex = new HashMap<>();
racksToIndex = new HashMap<>();
tablesToIndex = new HashMap<>();
// TODO: We should get the list of tables from master
tables = new ArrayList<>();
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
numRegions = 0;
List<List<Integer>> serversPerHostList = new ArrayList<>();
List<List<Integer>> serversPerRackList = new ArrayList<>();
this.clusterState = clusterState;
this.regionFinder = regionFinder;
// Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index.
for (ServerName sn : clusterState.keySet()) {
if (sn == null) {
LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
"skipping; unassigned regions?");
if (LOG.isTraceEnabled()) {
LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
}
continue;
}
if (serversToIndex.get(sn.getAddress()) == null) {
serversToIndex.put(sn.getAddress(), numServers++);
}
if (!hostsToIndex.containsKey(sn.getHostname())) {
hostsToIndex.put(sn.getHostname(), numHosts++);
serversPerHostList.add(new ArrayList<>(1));
}
int serverIndex = serversToIndex.get(sn.getAddress());
int hostIndex = hostsToIndex.get(sn.getHostname());
serversPerHostList.get(hostIndex).add(serverIndex);
String rack = this.rackManager.getRack(sn);
if (!racksToIndex.containsKey(rack)) {
racksToIndex.put(rack, numRacks++);
serversPerRackList.add(new ArrayList<>());
}
int rackIndex = racksToIndex.get(rack);
serversPerRackList.get(rackIndex).add(serverIndex);
}
// Count how many regions there are.
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
numRegions += unassignedRegions.size();
regionsToIndex = new HashMap<>(numRegions);
servers = new ServerName[numServers];
serversPerHost = new int[numHosts][];
serversPerRack = new int[numRacks][];
regions = new RegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
initialRegionIndexToServerIndex = new int[numRegions];
regionIndexToTableIndex = new int[numRegions];
regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions];
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
serverIndicesSortedByLocality = new Integer[numServers];
localityPerServer = new float[numServers];
serverIndexToHostIndex = new int[numServers];
serverIndexToRackIndex = new int[numServers];
regionsPerServer = new int[numServers][];
serverIndexToRegionsOffset = new int[numServers];
regionsPerHost = new int[numHosts][];
regionsPerRack = new int[numRacks][];
primariesOfRegionsPerServer = new int[numServers][];
primariesOfRegionsPerHost = new int[numHosts][];
primariesOfRegionsPerRack = new int[numRacks][];
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
if (entry.getKey() == null) {
LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
continue;
}
int serverIndex = serversToIndex.get(entry.getKey().getAddress());
// keep the servername if this is the first server name for this hostname
// or this servername has the newest startcode.
if (servers[serverIndex] == null ||
servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
servers[serverIndex] = entry.getKey();
}
if (regionsPerServer[serverIndex] != null) {
// there is another server with the same hostAndPort in ClusterState.
// allocate the array for the total size
regionsPerServer[serverIndex] =
new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
} else {
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
}
primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
serverIndicesSortedByLocality[serverIndex] = serverIndex;
}
hosts = new String[numHosts];
for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
hosts[entry.getValue()] = entry.getKey();
}
racks = new String[numRacks];
for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
racks[entry.getValue()] = entry.getKey();
}
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
int serverIndex = serversToIndex.get(entry.getKey().getAddress());
regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
serverIndexToHostIndex[serverIndex] = hostIndex;
int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
serverIndexToRackIndex[serverIndex] = rackIndex;
for (RegionInfo region : entry.getValue()) {
registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
regionIndex++;
}
serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
}
for (RegionInfo region : unassignedRegions) {
registerRegion(region, regionIndex, -1, loads, regionFinder);
regionIndex++;
}
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
}
if (serversPerHost[i].length > 1) {
multiServersPerHost = true;
}
}
for (int i = 0; i < serversPerRackList.size(); i++) {
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
for (int j = 0; j < serversPerRack[i].length; j++) {
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
}
}
numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
numRegionsPerServerPerTable[i][j] = 0;
}
}
for (int i = 0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
}
}
numMaxRegionsPerTable = new int[numTables];
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
for (int i = 0; i < regions.length; i++) {
RegionInfo info = regions[i];
if (RegionReplicaUtil.isDefaultReplica(info)) {
regionIndexToPrimaryIndex[i] = i;
} else {
hasRegionReplicas = true;
RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
}
}
for (int i = 0; i < regionsPerServer.length; i++) {
primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
for (int j = 0; j < regionsPerServer[i].length; j++) {
int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
primariesOfRegionsPerServer[i][j] = primaryIndex;
}
// sort the regions by primaries.
Arrays.sort(primariesOfRegionsPerServer[i]);
}
// compute regionsPerHost
if (multiServersPerHost) {
for (int i = 0; i < serversPerHost.length; i++) {
int numRegionsPerHost = 0;
for (int j = 0; j < serversPerHost[i].length; j++) {
numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
}
regionsPerHost[i] = new int[numRegionsPerHost];
primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
}
for (int i = 0; i < serversPerHost.length; i++) {
int numRegionPerHostIndex = 0;
for (int j = 0; j < serversPerHost[i].length; j++) {
for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
int region = regionsPerServer[serversPerHost[i][j]][k];
regionsPerHost[i][numRegionPerHostIndex] = region;
int primaryIndex = regionIndexToPrimaryIndex[region];
primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
numRegionPerHostIndex++;
}
}
// sort the regions by primaries.
Arrays.sort(primariesOfRegionsPerHost[i]);
}
}
// compute regionsPerRack
if (numRacks > 1) {
for (int i = 0; i < serversPerRack.length; i++) {
int numRegionsPerRack = 0;
for (int j = 0; j < serversPerRack[i].length; j++) {
numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
}
regionsPerRack[i] = new int[numRegionsPerRack];
primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
}
for (int i = 0; i < serversPerRack.length; i++) {
int numRegionPerRackIndex = 0;
for (int j = 0; j < serversPerRack[i].length; j++) {
for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
int region = regionsPerServer[serversPerRack[i][j]][k];
regionsPerRack[i][numRegionPerRackIndex] = region;
int primaryIndex = regionIndexToPrimaryIndex[region];
primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
numRegionPerRackIndex++;
}
}
// sort the regions by primaries.
Arrays.sort(primariesOfRegionsPerRack[i]);
}
}
}
/** Helper for Cluster constructor to handle a region */
private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder) {
String tableName = region.getTable().getNameAsString();
if (!tablesToIndex.containsKey(tableName)) {
tables.add(tableName);
tablesToIndex.put(tableName, tablesToIndex.size());
}
int tableIndex = tablesToIndex.get(tableName);
regionsToIndex.put(region, regionIndex);
regions[regionIndex] = region;
regionIndexToServerIndex[regionIndex] = serverIndex;
initialRegionIndexToServerIndex[regionIndex] = serverIndex;
regionIndexToTableIndex[regionIndex] = tableIndex;
// region load
if (loads != null) {
Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
if (rl == null) {
// Try getting the region load using encoded name.
rl = loads.get(region.getEncodedName());
}
regionLoads[regionIndex] = rl;
}
if (regionFinder != null) {
// region location
List<ServerName> loc = regionFinder.getTopBlockLocations(region);
regionLocations[regionIndex] = new int[loc.size()];
for (int i = 0; i < loc.size(); i++) {
regionLocations[regionIndex][i] = loc.get(i) == null ? -1 :
(serversToIndex.get(loc.get(i).getAddress()) == null ? -1 :
serversToIndex.get(loc.get(i).getAddress()));
}
}
}
/**
* Returns true iff a given server has less regions than the balanced amount
*/
public boolean serverHasTooFewRegions(int server) {
int minLoad = this.numRegions / numServers;
int numRegions = getNumRegions(server);
return numRegions < minLoad;
}
/**
* Retrieves and lazily initializes a field storing the locality of every region/server
* combination
*/
public float[][] getOrComputeRackLocalities() {
if (rackLocalities == null || regionsToMostLocalEntities == null) {
computeCachedLocalities();
}
return rackLocalities;
}
/**
* Lazily initializes and retrieves a mapping of region -> server for which region has the highest
* the locality
*/
public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
if (rackLocalities == null || regionsToMostLocalEntities == null) {
computeCachedLocalities();
}
return regionsToMostLocalEntities[type.ordinal()];
}
/**
* Looks up locality from cache of localities. Will create cache if it does not already exist.
*/
public float getOrComputeLocality(int region, int entity,
BalancerClusterState.LocalityType type) {
switch (type) {
case SERVER:
return getLocalityOfRegion(region, entity);
case RACK:
return getOrComputeRackLocalities()[region][entity];
default:
throw new IllegalArgumentException("Unsupported LocalityType: " + type);
}
}
/**
* Returns locality weighted by region size in MB. Will create locality cache if it does not
* already exist.
*/
public double getOrComputeWeightedLocality(int region, int server,
BalancerClusterState.LocalityType type) {
return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
}
/**
* Returns the size in MB from the most recent RegionLoad for region
*/
public int getRegionSizeMB(int region) {
Deque<BalancerRegionLoad> load = regionLoads[region];
// This means regions have no actual data on disk
if (load == null) {
return 0;
}
return regionLoads[region].getLast().getStorefileSizeMB();
}
/**
* Computes and caches the locality for each region/rack combinations, as well as storing a
* mapping of region -> server and region -> rack such that server and rack have the highest
* locality for region
*/
private void computeCachedLocalities() {
rackLocalities = new float[numRegions][numRacks];
regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
// Compute localities and find most local server per region
for (int region = 0; region < numRegions; region++) {
int serverWithBestLocality = 0;
float bestLocalityForRegion = 0;
for (int server = 0; server < numServers; server++) {
// Aggregate per-rack locality
float locality = getLocalityOfRegion(region, server);
int rack = serverIndexToRackIndex[server];
int numServersInRack = serversPerRack[rack].length;
rackLocalities[region][rack] += locality / numServersInRack;
if (locality > bestLocalityForRegion) {
serverWithBestLocality = server;
bestLocalityForRegion = locality;
}
}
regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
// Find most local rack per region
int rackWithBestLocality = 0;
float bestRackLocalityForRegion = 0.0f;
for (int rack = 0; rack < numRacks; rack++) {
float rackLocality = rackLocalities[region][rack];
if (rackLocality > bestRackLocalityForRegion) {
bestRackLocalityForRegion = rackLocality;
rackWithBestLocality = rack;
}
}
regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
}
}
/**
* Maps region index to rack index
*/
public int getRackForRegion(int region) {
return serverIndexToRackIndex[regionIndexToServerIndex[region]];
}
enum LocalityType {
SERVER, RACK
}
public void doAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
break;
case ASSIGN_REGION:
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
assert action instanceof AssignRegionAction : action.getClass();
AssignRegionAction ar = (AssignRegionAction) action;
regionsPerServer[ar.getServer()] =
addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
regionMoved(ar.getRegion(), -1, ar.getServer());
break;
case MOVE_REGION:
assert action instanceof MoveRegionAction : action.getClass();
MoveRegionAction mra = (MoveRegionAction) action;
regionsPerServer[mra.getFromServer()] =
removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
regionsPerServer[mra.getToServer()] =
addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break;
case SWAP_REGIONS:
assert action instanceof SwapRegionsAction : action.getClass();
SwapRegionsAction a = (SwapRegionsAction) action;
regionsPerServer[a.getFromServer()] =
replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
regionsPerServer[a.getToServer()] =
replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
default:
throw new RuntimeException("Uknown action:" + action.getType());
}
}
/**
* Return true if the placement of region on server would lower the availability of the region in
* question
* @return true or false
*/
boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
if (!serversToIndex.containsKey(serverName.getAddress())) {
return false; // safeguard against race between cluster.servers and servers from LB method
// args
}
int server = serversToIndex.get(serverName.getAddress());
int region = regionsToIndex.get(regionInfo);
// Region replicas for same region should better assign to different servers
for (int i : regionsPerServer[server]) {
RegionInfo otherRegionInfo = regions[i];
if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
return true;
}
}
int primary = regionIndexToPrimaryIndex[region];
if (primary == -1) {
return false;
}
// there is a subset relation for server < host < rack
// check server first
if (contains(primariesOfRegionsPerServer[server], primary)) {
// check for whether there are other servers that we can place this region
for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
return true; // meaning there is a better server
}
}
return false; // there is not a better server to place this
}
// check host
if (multiServersPerHost) {
// these arrays would only be allocated if we have more than one server per host
int host = serverIndexToHostIndex[server];
if (contains(primariesOfRegionsPerHost[host], primary)) {
// check for whether there are other hosts that we can place this region
for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
return true; // meaning there is a better host
}
}
return false; // there is not a better host to place this
}
}
// check rack
if (numRacks > 1) {
int rack = serverIndexToRackIndex[server];
if (contains(primariesOfRegionsPerRack[rack], primary)) {
// check for whether there are other racks that we can place this region
for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
return true; // meaning there is a better rack
}
}
return false; // there is not a better rack to place this
}
}
return false;
}
void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
if (!serversToIndex.containsKey(serverName.getAddress())) {
return;
}
int server = serversToIndex.get(serverName.getAddress());
int region = regionsToIndex.get(regionInfo);
doAction(new AssignRegionAction(region, server));
}
void regionMoved(int region, int oldServer, int newServer) {
regionIndexToServerIndex[region] = newServer;
if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; // region moved back to original location
} else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
numMovedRegions++; // region moved from original location
}
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
}
numRegionsPerServerPerTable[newServer][tableIndex]++;
// check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex]
+ 1) == numMaxRegionsPerTable[tableIndex]) {
// recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// update for servers
int primary = regionIndexToPrimaryIndex[region];
if (oldServer >= 0) {
primariesOfRegionsPerServer[oldServer] =
removeRegion(primariesOfRegionsPerServer[oldServer], primary);
}
primariesOfRegionsPerServer[newServer] =
addRegionSorted(primariesOfRegionsPerServer[newServer], primary);
// update for hosts
if (multiServersPerHost) {
int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
int newHost = serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
primariesOfRegionsPerHost[newHost] =
addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
if (oldHost >= 0) {
regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
primariesOfRegionsPerHost[oldHost] =
removeRegion(primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
}
}
}
// update for racks
if (numRacks > 1) {
int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
int newRack = serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
primariesOfRegionsPerRack[newRack] =
addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
if (oldRack >= 0) {
regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
primariesOfRegionsPerRack[oldRack] =
removeRegion(primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
}
}
}
}
int[] removeRegion(int[] regions, int regionIndex) {
// TODO: this maybe costly. Consider using linked lists
int[] newRegions = new int[regions.length - 1];
int i = 0;
for (i = 0; i < regions.length; i++) {
if (regions[i] == regionIndex) {
break;
}
newRegions[i] = regions[i];
}
System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
return newRegions;
}
int[] addRegion(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
System.arraycopy(regions, 0, newRegions, 0, regions.length);
newRegions[newRegions.length - 1] = regionIndex;
return newRegions;
}
int[] addRegionSorted(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
int i = 0;
for (i = 0; i < regions.length; i++) { // find the index to insert
if (regions[i] > regionIndex) {
break;
}
}
System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
newRegions[i] = regionIndex;
return newRegions;
}
int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
int i = 0;
for (i = 0; i < regions.length; i++) {
if (regions[i] == regionIndex) {
regions[i] = newRegionIndex;
break;
}
}
return regions;
}
void sortServersByRegionCount() {
Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
}
int getNumRegions(int server) {
return regionsPerServer[server].length;
}
boolean contains(int[] arr, int val) {
return Arrays.binarySearch(arr, val) >= 0;
}
private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
int getLowestLocalityRegionOnServer(int serverIndex) {
if (regionFinder != null) {
float lowestLocality = 1.0f;
int lowestLocalityRegionIndex = -1;
if (regionsPerServer[serverIndex].length == 0) {
// No regions on that region server
return -1;
}
for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
int regionIndex = regionsPerServer[serverIndex][j];
HDFSBlocksDistribution distribution =
regionFinder.getBlockDistribution(regions[regionIndex]);
float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
// skip empty region
if (distribution.getUniqueBlocksTotalWeight() == 0) {
continue;
}
if (locality < lowestLocality) {
lowestLocality = locality;
lowestLocalityRegionIndex = j;
}
}
if (lowestLocalityRegionIndex == -1) {
return -1;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Lowest locality region is " +
regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
.getRegionNameAsString() +
" with locality " + lowestLocality + " and its region server contains " +
regionsPerServer[serverIndex].length + " regions");
}
return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
} else {
return -1;
}
}
float getLocalityOfRegion(int region, int server) {
if (regionFinder != null) {
HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
return distribution.getBlockLocalityIndex(servers[server].getHostname());
} else {
return 0f;
}
}
void setNumRegions(int numRegions) {
this.numRegions = numRegions;
}
void setNumMovedRegions(int numMovedRegions) {
this.numMovedRegions = numMovedRegions;
}
@Override
public String toString() {
StringBuilder desc = new StringBuilder("Cluster={servers=[");
for (ServerName sn : servers) {
desc.append(sn.getAddress().toString()).append(", ");
}
desc.append("], serverIndicesSortedByRegionCount=")
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
.append(Arrays.deepToString(regionsPerServer));
desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
.append('}');
return desc.toString();
}
}

View File

@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
abstract class CandidateGenerator { abstract class CandidateGenerator {
abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster); abstract BalanceAction generate(BalancerClusterState cluster);
/** /**
* From a list of regions pick a random one. Null can be returned which * From a list of regions pick a random one. Null can be returned which
@ -42,7 +42,7 @@ abstract class CandidateGenerator {
* @return a random {@link RegionInfo} or null if an asymmetrical move is * @return a random {@link RegionInfo} or null if an asymmetrical move is
* suggested. * suggested.
*/ */
int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server, int pickRandomRegion(BalancerClusterState cluster, int server,
double chanceOfNoSwap) { double chanceOfNoSwap) {
// Check to see if this is just a move. // Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0 if (cluster.regionsPerServer[server].length == 0
@ -54,7 +54,7 @@ abstract class CandidateGenerator {
return cluster.regionsPerServer[server][rand]; return cluster.regionsPerServer[server][rand];
} }
int pickRandomServer(BaseLoadBalancer.Cluster cluster) { int pickRandomServer(BalancerClusterState cluster) {
if (cluster.numServers < 1) { if (cluster.numServers < 1) {
return -1; return -1;
} }
@ -62,7 +62,7 @@ abstract class CandidateGenerator {
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers); return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
} }
int pickRandomRack(BaseLoadBalancer.Cluster cluster) { int pickRandomRack(BalancerClusterState cluster) {
if (cluster.numRacks < 1) { if (cluster.numRacks < 1) {
return -1; return -1;
} }
@ -70,7 +70,7 @@ abstract class CandidateGenerator {
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks); return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
} }
int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) { int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) {
if (cluster.numServers < 2) { if (cluster.numServers < 2) {
return -1; return -1;
} }
@ -82,7 +82,7 @@ abstract class CandidateGenerator {
} }
} }
int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) { int pickOtherRandomRack(BalancerClusterState cluster, int rackIndex) {
if (cluster.numRacks < 2) { if (cluster.numRacks < 2) {
return -1; return -1;
} }
@ -94,10 +94,10 @@ abstract class CandidateGenerator {
} }
} }
BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster, BalanceAction pickRandomRegions(BalancerClusterState cluster,
int thisServer, int otherServer) { int thisServer, int otherServer) {
if (thisServer < 0 || otherServer < 0) { if (thisServer < 0 || otherServer < 0) {
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
// Decide who is most likely to need another region // Decide who is most likely to need another region
@ -114,20 +114,20 @@ abstract class CandidateGenerator {
return getAction(thisServer, thisRegion, otherServer, otherRegion); return getAction(thisServer, thisRegion, otherServer, otherRegion);
} }
protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion, protected BalanceAction getAction(int fromServer, int fromRegion,
int toServer, int toRegion) { int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) { if (fromServer < 0 || toServer < 0) {
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
if (fromRegion >= 0 && toRegion >= 0) { if (fromRegion >= 0 && toRegion >= 0) {
return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion, return new SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion); toServer, toRegion);
} else if (fromRegion >= 0) { } else if (fromRegion >= 0) {
return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer); return new MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion >= 0) { } else if (toRegion >= 0) {
return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer); return new MoveRegionAction(toRegion, toServer, fromServer);
} else { } else {
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
} }
} }

View File

@ -559,13 +559,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
private class FavoredNodeLocalityPicker extends CandidateGenerator { private class FavoredNodeLocalityPicker extends CandidateGenerator {
@Override @Override
protected Cluster.Action generate(Cluster cluster) { protected BalanceAction generate(BalancerClusterState cluster) {
int thisServer = pickRandomServer(cluster); int thisServer = pickRandomServer(cluster);
int thisRegion; int thisRegion;
if (thisServer == -1) { if (thisServer == -1) {
LOG.trace("Could not pick lowest local region server"); LOG.trace("Could not pick lowest local region server");
return Cluster.NullAction; return BalanceAction.NULL_ACTION;
} else { } else {
// Pick lowest local region on this server // Pick lowest local region on this server
thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
@ -575,7 +575,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
LOG.trace("Could not pick lowest local region even when region server held " LOG.trace("Could not pick lowest local region even when region server held "
+ cluster.regionsPerServer[thisServer].length + " regions"); + cluster.regionsPerServer[thisServer].length + " regions");
} }
return Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
RegionInfo hri = cluster.regions[thisRegion]; RegionInfo hri = cluster.regions[thisRegion];
@ -587,7 +587,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
} else { } else {
// No FN, ignore // No FN, ignore
LOG.trace("Ignoring, no favored nodes for region: " + hri); LOG.trace("Ignoring, no favored nodes for region: " + hri);
return Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
} else { } else {
// Pick other favored node with the highest locality // Pick other favored node with the highest locality
@ -596,7 +596,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return getAction(thisServer, thisRegion, otherServer, -1); return getAction(thisServer, thisRegion, otherServer, -1);
} }
private int getDifferentFavoredNode(Cluster cluster, List<ServerName> favoredNodes, private int getDifferentFavoredNode(BalancerClusterState cluster, List<ServerName> favoredNodes,
int currentServer) { int currentServer) {
List<Integer> fnIndex = new ArrayList<>(); List<Integer> fnIndex = new ArrayList<>();
for (ServerName sn : favoredNodes) { for (ServerName sn : favoredNodes) {
@ -618,7 +618,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return highestLocalRSIndex; return highestLocalRSIndex;
} }
private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { private int pickLowestLocalRegionOnServer(BalancerClusterState cluster, int server) {
return cluster.getLowestLocalityRegionOnServer(server); return cluster.getLowestLocalityRegionOnServer(server);
} }
} }
@ -630,7 +630,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
class FavoredNodeLoadPicker extends CandidateGenerator { class FavoredNodeLoadPicker extends CandidateGenerator {
@Override @Override
Cluster.Action generate(Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
cluster.sortServersByRegionCount(); cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster); int thisServer = pickMostLoadedServer(cluster);
int thisRegion = pickRandomRegion(cluster, thisServer, 0); int thisRegion = pickRandomRegion(cluster, thisServer, 0);
@ -641,7 +641,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) { if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
otherServer = pickLeastLoadedServer(cluster, thisServer); otherServer = pickLeastLoadedServer(cluster, thisServer);
} else { } else {
return Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
} else { } else {
otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer); otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
@ -649,7 +649,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return getAction(thisServer, thisRegion, otherServer, -1); return getAction(thisServer, thisRegion, otherServer, -1);
} }
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount; Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index; int index;
for (index = 0; index < servers.length ; index++) { for (index = 0; index < servers.length ; index++) {
@ -660,8 +660,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return servers[index]; return servers[index];
} }
private int pickLeastLoadedFNServer(final Cluster cluster, List<ServerName> favoredNodes, private int pickLeastLoadedFNServer(final BalancerClusterState cluster,
int currentServerIndex) { List<ServerName> favoredNodes, int currentServerIndex) {
List<Integer> fnIndex = new ArrayList<>(); List<Integer> fnIndex = new ArrayList<>();
for (ServerName sn : favoredNodes) { for (ServerName sn : favoredNodes) {
if (cluster.serversToIndex.containsKey(sn.getAddress())) { if (cluster.serversToIndex.containsKey(sn.getAddress())) {
@ -682,7 +682,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return leastLoadedFN; return leastLoadedFN;
} }
private int pickMostLoadedServer(final Cluster cluster) { private int pickMostLoadedServer(final BalancerClusterState cluster) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount; Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index; int index;
for (index = servers.length - 1; index > 0 ; index--) { for (index = servers.length - 1; index > 0 ; index--) {

View File

@ -120,7 +120,7 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
* any costly calculation. * any costly calculation.
*/ */
@Override @Override
void init(final BaseLoadBalancer.Cluster cluster) { void init(final BalancerClusterState cluster) {
this.cluster = cluster; this.cluster = cluster;
this.loadRules(); this.loadRules();
} }

View File

@ -24,14 +24,14 @@ import org.apache.yetus.audience.InterfaceAudience;
class LoadCandidateGenerator extends CandidateGenerator { class LoadCandidateGenerator extends CandidateGenerator {
@Override @Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
cluster.sortServersByRegionCount(); cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1); int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer); int otherServer = pickLeastLoadedServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer); return pickRandomRegions(cluster, thisServer, otherServer);
} }
private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount; Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = 0; int index = 0;
@ -44,7 +44,7 @@ class LoadCandidateGenerator extends CandidateGenerator {
return servers[index]; return servers[index];
} }
private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount; Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = servers.length - 1; int index = servers.length - 1;

View File

@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
class LocalityBasedCandidateGenerator extends CandidateGenerator { class LocalityBasedCandidateGenerator extends CandidateGenerator {
@Override @Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
// iterate through regions until you find one that is not on ideal host // iterate through regions until you find one that is not on ideal host
// start from a random point to avoid always balance the regions in front // start from a random point to avoid always balance the regions in front
if (cluster.numRegions > 0) { if (cluster.numRegions > 0) {
@ -35,20 +35,20 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator {
int region = (startIndex + i) % cluster.numRegions; int region = (startIndex + i) % cluster.numRegions;
int currentServer = cluster.regionIndexToServerIndex[region]; int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities( if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) { BalancerClusterState.LocalityType.SERVER)[region]) {
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster, Optional<BalanceAction> potential = tryMoveOrSwap(cluster,
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities( currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]); BalancerClusterState.LocalityType.SERVER)[region]);
if (potential.isPresent()) { if (potential.isPresent()) {
return potential.get(); return potential.get();
} }
} }
} }
} }
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer.Cluster cluster, private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster,
int fromServer, int fromRegion, int toServer) { int fromServer, int fromRegion, int toServer) {
// Try move first. We know apriori fromRegion has the highest locality on toServer // Try move first. We know apriori fromRegion has the highest locality on toServer
if (cluster.serverHasTooFewRegions(toServer)) { if (cluster.serverHasTooFewRegions(toServer)) {
@ -74,8 +74,8 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator {
return Optional.empty(); return Optional.empty();
} }
private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) { private double getWeightedLocality(BalancerClusterState cluster, int region, int server) {
return cluster.getOrComputeWeightedLocality(region, server, return cluster.getOrComputeWeightedLocality(region, server,
BaseLoadBalancer.Cluster.LocalityType.SERVER); BalancerClusterState.LocalityType.SERVER);
} }
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class MoveRegionAction extends BalanceAction {
private final int region;
private final int fromServer;
private final int toServer;
public MoveRegionAction(int region, int fromServer, int toServer) {
super(Type.MOVE_REGION);
this.fromServer = fromServer;
this.region = region;
this.toServer = toServer;
}
public int getRegion() {
return region;
}
public int getFromServer() {
return fromServer;
}
public int getToServer() {
return toServer;
}
@Override
public BalanceAction undoAction() {
return new MoveRegionAction(region, toServer, fromServer);
}
@Override
public String toString() {
return getType() + ": " + region + ":" + fromServer + " -> " + toServer;
}
}

View File

@ -83,10 +83,10 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
} }
@Override @Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
int serverIndex = pickRandomServer(cluster); int serverIndex = pickRandomServer(cluster);
if (cluster.numServers <= 1 || serverIndex == -1) { if (cluster.numServers <= 1 || serverIndex == -1) {
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
int regionIndex = selectCoHostedRegionPerGroup( int regionIndex = selectCoHostedRegionPerGroup(

View File

@ -271,7 +271,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
// construct a Cluster object with clusterMap and rest of the // construct a Cluster object with clusterMap and rest of the
// argument as defaults // argument as defaults
Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager); BalancerClusterState c =
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) { if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
return null; return null;
} }

View File

@ -39,12 +39,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision; import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
@ -307,7 +302,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { protected synchronized boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
regionReplicaHostCostFunction.init(c); regionReplicaHostCostFunction.init(c);
if (regionReplicaHostCostFunction.cost() > 0) { if (regionReplicaHostCostFunction.cost() > 0) {
return true; return true;
@ -320,7 +315,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
protected boolean needsBalance(TableName tableName, Cluster cluster) { protected boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) { if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -367,8 +362,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return !balanced; return !balanced;
} }
@InterfaceAudience.Private BalanceAction nextAction(BalancerClusterState cluster) {
Cluster.Action nextAction(Cluster cluster) {
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size())) return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
.generate(cluster); .generate(cluster);
} }
@ -406,7 +400,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
//The clusterState that is given to this method contains the state //The clusterState that is given to this method contains the state
//of all the regions in the table(s) (that's true today) //of all the regions in the table(s) (that's true today)
// Keep track of servers to iterate through them. // Keep track of servers to iterate through them.
Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager); BalancerClusterState cluster =
new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
long startTime = EnvironmentEdgeManager.currentTime(); long startTime = EnvironmentEdgeManager.currentTime();
@ -446,9 +441,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
long step; long step;
for (step = 0; step < computedMaxSteps; step++) { for (step = 0; step < computedMaxSteps; step++) {
Cluster.Action action = nextAction(cluster); BalanceAction action = nextAction(cluster);
if (action.type == Type.NULL) { if (action.getType() == BalanceAction.Type.NULL) {
continue; continue;
} }
@ -467,7 +462,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} else { } else {
// Put things back the way they were before. // Put things back the way they were before.
// TODO: undo by remembering old values // TODO: undo by remembering old values
Action undoAction = action.undoAction(); BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction); cluster.doAction(undoAction);
updateCostsWithAction(cluster, undoAction); updateCostsWithAction(cluster, undoAction);
} }
@ -588,7 +583,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param cluster The state of the cluster * @param cluster The state of the cluster
* @return List of RegionPlan's that represent the moves needed to get to desired final state. * @return List of RegionPlan's that represent the moves needed to get to desired final state.
*/ */
private List<RegionPlan> createRegionPlans(Cluster cluster) { private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
List<RegionPlan> plans = new LinkedList<>(); List<RegionPlan> plans = new LinkedList<>();
for (int regionIndex = 0; for (int regionIndex = 0;
regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
@ -639,13 +634,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
protected void initCosts(Cluster cluster) { protected void initCosts(BalancerClusterState cluster) {
for (CostFunction c:costFunctions) { for (CostFunction c:costFunctions) {
c.init(cluster); c.init(cluster);
} }
} }
protected void updateCostsWithAction(Cluster cluster, Action action) { protected void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
for (CostFunction c : costFunctions) { for (CostFunction c : costFunctions) {
c.postAction(action); c.postAction(action);
} }
@ -674,7 +669,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @return a double of a cost associated with the proposed cluster state. This cost is an * @return a double of a cost associated with the proposed cluster state. This cost is an
* aggregate of all individual cost functions. * aggregate of all individual cost functions.
*/ */
protected double computeCost(Cluster cluster, double previousCost) { protected double computeCost(BalancerClusterState cluster, double previousCost) {
double total = 0; double total = 0;
for (int i = 0; i < costFunctions.size(); i++) { for (int i = 0; i < costFunctions.size(); i++) {
@ -702,7 +697,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
static class RandomCandidateGenerator extends CandidateGenerator { static class RandomCandidateGenerator extends CandidateGenerator {
@Override @Override
Cluster.Action generate(Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
int thisServer = pickRandomServer(cluster); int thisServer = pickRandomServer(cluster);
@ -719,7 +714,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
*/ */
static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
@Override @Override
Cluster.Action generate(Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
int rackIndex = pickRandomRack(cluster); int rackIndex = pickRandomRack(cluster);
if (cluster.numRacks <= 1 || rackIndex == -1) { if (cluster.numRacks <= 1 || rackIndex == -1) {
return super.generate(cluster); return super.generate(cluster);
@ -753,7 +748,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private float multiplier = 0; private float multiplier = 0;
protected Cluster cluster; protected BalancerClusterState cluster;
public CostFunction(Configuration c) { public CostFunction(Configuration c) {
} }
@ -772,7 +767,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
/** Called once per LB invocation to give the cost function /** Called once per LB invocation to give the cost function
* to initialize it's state, and perform any costly calculation. * to initialize it's state, and perform any costly calculation.
*/ */
void init(Cluster cluster) { void init(BalancerClusterState cluster) {
this.cluster = cluster; this.cluster = cluster;
} }
@ -780,24 +775,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* an opportunity to update it's state. postAction() is always * an opportunity to update it's state. postAction() is always
* called at least once before cost() is called with the cluster * called at least once before cost() is called with the cluster
* that this action is performed on. */ * that this action is performed on. */
void postAction(Action action) { void postAction(BalanceAction action) {
switch (action.type) { switch (action.getType()) {
case NULL: break; case NULL:
break;
case ASSIGN_REGION: case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action; AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.region, -1, ar.server); regionMoved(ar.getRegion(), -1, ar.getServer());
break; break;
case MOVE_REGION: case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action; MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.region, mra.fromServer, mra.toServer); regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break; break;
case SWAP_REGIONS: case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action; SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.fromRegion, a.fromServer, a.toServer); regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.toRegion, a.toServer, a.fromServer); regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break; break;
default: default:
throw new RuntimeException("Uknown action:" + action.type); throw new RuntimeException("Uknown action:" + action.getType());
} }
} }
@ -948,7 +944,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
void init(Cluster cluster) { void init(BalancerClusterState cluster) {
super.init(cluster); super.init(cluster);
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(), LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions); cluster.numServers, cluster.numRegions);
@ -1072,7 +1068,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
abstract int regionIndexToEntityIndex(int region); abstract int regionIndexToEntityIndex(int region);
@Override @Override
void init(Cluster cluster) { void init(BalancerClusterState cluster) {
super.init(cluster); super.init(cluster);
locality = 0.0; locality = 0.0;
bestLocality = 0.0; bestLocality = 0.0;
@ -1300,7 +1296,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
void init(Cluster cluster) { void init(BalancerClusterState cluster) {
super.init(cluster); super.init(cluster);
// max cost is the case where every region replica is hosted together regardless of host // max cost is the case where every region replica is hosted together regardless of host
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
@ -1313,7 +1309,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
long getMaxCost(Cluster cluster) { long getMaxCost(BalancerClusterState cluster) {
if (!cluster.hasRegionReplicas) { if (!cluster.hasRegionReplicas) {
return 0; // short circuit return 0; // short circuit
} }
@ -1412,7 +1408,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
void init(Cluster cluster) { void init(BalancerClusterState cluster) {
this.cluster = cluster; this.cluster = cluster;
if (cluster.numRacks <= 1) { if (cluster.numRacks <= 1) {
maxCost = 0; maxCost = 0;

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class SwapRegionsAction extends BalanceAction {
private final int fromServer;
private final int fromRegion;
private final int toServer;
private final int toRegion;
SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
super(Type.SWAP_REGIONS);
this.fromServer = fromServer;
this.fromRegion = fromRegion;
this.toServer = toServer;
this.toRegion = toRegion;
}
public int getFromServer() {
return fromServer;
}
public int getFromRegion() {
return fromRegion;
}
public int getToServer() {
return toServer;
}
public int getToRegion() {
return toRegion;
}
@Override
public BalanceAction undoAction() {
return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion);
}
@Override
public String toString() {
return getType() + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
}
}

View File

@ -376,8 +376,8 @@ public class BalancerTestBase {
return mockClusterServers(mockCluster, -1); return mockClusterServers(mockCluster, -1);
} }
protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { protected BalancerClusterState mockCluster(int[] mockCluster) {
return new BaseLoadBalancer.Cluster( return new BalancerClusterState(
mockClusterServers(mockCluster, -1), null, null, null); mockClusterServers(mockCluster, -1), null, null, null);
} }

View File

@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
@ -309,7 +307,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// cluster is created (constructor code) would make sure the indices of // cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState // the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important). A similar thing applies to the region lists // map (linkedhashmap is important). A similar thing applies to the region lists
Cluster cluster = new Cluster(clusterState, null, null, rackManager); BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager);
// check whether a move of region1 from servers[0] to servers[1] would lower // check whether a move of region1 from servers[0] to servers[1] would lower
// the availability of region1 // the availability of region1
assertTrue(cluster.wouldLowerAvailability(hri1, servers[1])); assertTrue(cluster.wouldLowerAvailability(hri1, servers[1]));
@ -326,7 +324,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// now lets have servers[1] host replica_of_region2 // now lets have servers[1] host replica_of_region2
list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1));
// create a new clusterState with the above change // create a new clusterState with the above change
cluster = new Cluster(clusterState, null, null, rackManager); cluster = new BalancerClusterState(clusterState, null, null, rackManager);
// now check whether a move of a replica from servers[0] to servers[1] would lower // now check whether a move of a replica from servers[0] to servers[1] would lower
// the availability of region2 // the availability of region2
assertTrue(cluster.wouldLowerAvailability(hri3, servers[1])); assertTrue(cluster.wouldLowerAvailability(hri3, servers[1]));
@ -338,14 +336,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[10], new ArrayList<>()); //servers[10], rack3 hosts no region clusterState.put(servers[10], new ArrayList<>()); //servers[10], rack3 hosts no region
// create a cluster with the above clusterState // create a cluster with the above clusterState
cluster = new Cluster(clusterState, null, null, rackManager); cluster = new BalancerClusterState(clusterState, null, null, rackManager);
// check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would // check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would
// lower the availability // lower the availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[0])); assertTrue(cluster.wouldLowerAvailability(hri1, servers[0]));
// now create a cluster without the rack manager // now create a cluster without the rack manager
cluster = new Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
// now repeat check whether a move of region1 from servers[0] to servers[6] would // now repeat check whether a move of region1 from servers[0] to servers[6] would
// lower the availability // lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6])); assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6]));
@ -383,7 +381,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// cluster is created (constructor code) would make sure the indices of // cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState // the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important). // map (linkedhashmap is important).
Cluster cluster = new Cluster(clusterState, null, null, rackManager); BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager);
// check whether moving region1 from servers[1] to servers[2] would lower availability // check whether moving region1 from servers[1] to servers[2] would lower availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2]));
@ -405,7 +403,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2 clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2
// create a cluster with the above clusterState // create a cluster with the above clusterState
cluster = new Cluster(clusterState, null, null, rackManager); cluster = new BalancerClusterState(clusterState, null, null, rackManager);
// check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would // check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would
// lower the availability // lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0])); assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0]));
@ -487,7 +485,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
assignRegions(regions, oldServers, clusterState); assignRegions(regions, oldServers, clusterState);
// should not throw exception: // should not throw exception:
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, null, null); BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, null);
assertEquals(101 + 9, cluster.numRegions); assertEquals(101 + 9, cluster.numRegions);
assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port
@ -533,12 +531,15 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
Lists.newArrayList(servers.get(0), servers.get(1))); Lists.newArrayList(servers.get(0), servers.get(1)));
when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( // this server does not exists in clusterStatus
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus when(locationFinder.getTopBlockLocations(regions.get(43)))
.thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); BalancerClusterState cluster =
new BalancerClusterState(clusterState, null, locationFinder, null);
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test // this is ok, it is just a test
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0));
int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1));
int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10)); int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10));
int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42)); int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42));

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
@ -184,7 +183,8 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
regionFinder.setClusterMetrics(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); regionFinder.setClusterMetrics(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
regionFinder.setConf(conf); regionFinder.setConf(conf);
regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster());
Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf)); BalancerClusterState cluster =
new BalancerClusterState(serverAssignments, null, regionFinder, new RackManager(conf));
LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL
.getMiniHBaseCluster().getMaster().getLoadBalancer(); .getMiniHBaseCluster().getMaster().getLoadBalancer();
@ -203,13 +203,13 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
if (userRegionPicked) { if (userRegionPicked) {
break; break;
} else { } else {
Cluster.Action action = loadPicker.generate(cluster); BalanceAction action = loadPicker.generate(cluster);
if (action.type == Cluster.Action.Type.MOVE_REGION) { if (action.getType() == BalanceAction.Type.MOVE_REGION) {
Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action; MoveRegionAction moveRegionAction = (MoveRegionAction) action;
RegionInfo region = cluster.regions[moveRegionAction.region]; RegionInfo region = cluster.regions[moveRegionAction.getRegion()];
assertNotEquals(-1, moveRegionAction.toServer); assertNotEquals(-1, moveRegionAction.getToServer());
ServerName destinationServer = cluster.servers[moveRegionAction.toServer]; ServerName destinationServer = cluster.servers[moveRegionAction.getToServer()];
assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer); assertEquals(cluster.servers[moveRegionAction.getFromServer()], mostLoadedServer);
if (!region.getTable().isSystemTable()) { if (!region.getTable().isSystemTable()) {
List<ServerName> favNodes = fnm.getFavoredNodes(region); List<ServerName> favNodes = fnm.getFavoredNodes(region);
assertTrue(favNodes.contains( assertTrue(favNodes.contains(

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MockNoopMasterServices; import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -214,7 +213,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf); costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
BaseLoadBalancer.Cluster cluster = mockCluster(clusterStateMocks[0]); BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
costFunction.init(cluster); costFunction.init(cluster);
costFunction.cost(); costFunction.cost();
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST, assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST,
@ -241,7 +240,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
StochasticLoadBalancer.CostFunction StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf); costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster); costFunction.init(cluster);
double cost = costFunction.cost(); double cost = costFunction.cost();
assertEquals(0.0f, cost, 0.001); assertEquals(0.0f, cost, 0.001);
@ -304,14 +303,14 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
final int runs = 10; final int runs = 10;
loadBalancer.setConf(conf); loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); BalancerClusterState cluster = mockCluster(mockCluster);
loadBalancer.initCosts(cluster); loadBalancer.initCosts(cluster);
for (int i = 0; i != runs; ++i) { for (int i = 0; i != runs; ++i) {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
Cluster.Action action = loadBalancer.nextAction(cluster); BalanceAction action = loadBalancer.nextAction(cluster);
cluster.doAction(action); cluster.doAction(action);
loadBalancer.updateCostsWithAction(cluster, action); loadBalancer.updateCostsWithAction(cluster, action);
Cluster.Action undoAction = action.undoAction(); BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction); cluster.doAction(undoAction);
loadBalancer.updateCostsWithAction(cluster, undoAction); loadBalancer.updateCostsWithAction(cluster, undoAction);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
@ -326,7 +325,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
StochasticLoadBalancer.CostFunction StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster); costFunction.init(cluster);
double cost = costFunction.cost(); double cost = costFunction.cost();
assertTrue(cost >= 0); assertTrue(cost >= 0);
@ -459,7 +458,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
} }
// This mock allows us to test the LocalityCostFunction // This mock allows us to test the LocalityCostFunction
private class MockCluster extends BaseLoadBalancer.Cluster { private class MockCluster extends BalancerClusterState {
private int[][] localities = null; // [region][server] = percent of blocks private int[][] localities = null; // [region][server] = percent of blocks

View File

@ -197,8 +197,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
final HeterogeneousRegionCountCostFunction cf = final HeterogeneousRegionCountCostFunction cf =
new HeterogeneousRegionCountCostFunction(conf); new HeterogeneousRegionCountCostFunction(conf);
assertNotNull(cf); assertNotNull(cf);
BaseLoadBalancer.Cluster cluster = BalancerClusterState cluster =
new BaseLoadBalancer.Cluster(serverMap, null, null, null); new BalancerClusterState(serverMap, null, null, null);
cf.init(cluster); cf.init(cluster);
// checking that we all hosts have a number of regions below their limit // checking that we all hosts have a number of regions below their limit
@ -285,10 +285,10 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
StochasticLoadBalancer.RandomCandidateGenerator { StochasticLoadBalancer.RandomCandidateGenerator {
@Override @Override
public BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster, public BalanceAction pickRandomRegions(BalancerClusterState cluster,
int thisServer, int otherServer) { int thisServer, int otherServer) {
if (thisServer < 0 || otherServer < 0) { if (thisServer < 0 || otherServer < 0) {
return BaseLoadBalancer.Cluster.NullAction; return BalanceAction.NULL_ACTION;
} }
int thisRegion = pickRandomRegion(cluster, thisServer, 0.5); int thisRegion = pickRandomRegion(cluster, thisServer, 0.5);
@ -298,7 +298,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
} }
@Override @Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { BalanceAction generate(BalancerClusterState cluster) {
return super.generate(cluster); return super.generate(cluster);
} }
} }

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -55,7 +54,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
StochasticLoadBalancer.CostFunction costFunction = StochasticLoadBalancer.CostFunction costFunction =
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster); costFunction.init(cluster);
double cost = costFunction.cost(); double cost = costFunction.cost();
assertTrue(cost >= 0); assertTrue(cost >= 0);
@ -72,9 +71,9 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
int[] servers = new int[] { 3, 3, 3, 3, 3 }; int[] servers = new int[] { 3, 3, 3, 3, 3 };
TreeMap<ServerName, List<RegionInfo>> clusterState = mockClusterServers(servers); TreeMap<ServerName, List<RegionInfo>> clusterState = mockClusterServers(servers);
BaseLoadBalancer.Cluster cluster; BalancerClusterState cluster;
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
costFunction.init(cluster); costFunction.init(cluster);
double costWithoutReplicas = costFunction.cost(); double costWithoutReplicas = costFunction.cost();
assertEquals(0, costWithoutReplicas, 0); assertEquals(0, costWithoutReplicas, 0);
@ -84,7 +83,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
RegionReplicaUtil.getRegionInfoForReplica(clusterState.firstEntry().getValue().get(0), 1); RegionReplicaUtil.getRegionInfoForReplica(clusterState.firstEntry().getValue().get(0), 1);
clusterState.lastEntry().getValue().add(replica1); clusterState.lastEntry().getValue().add(replica1);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
costFunction.init(cluster); costFunction.init(cluster);
double costWith1ReplicaDifferentServer = costFunction.cost(); double costWith1ReplicaDifferentServer = costFunction.cost();
@ -94,7 +93,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
RegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2); RegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2);
clusterState.lastEntry().getValue().add(replica2); clusterState.lastEntry().getValue().add(replica2);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
costFunction.init(cluster); costFunction.init(cluster);
double costWith1ReplicaSameServer = costFunction.cost(); double costWith1ReplicaSameServer = costFunction.cost();
@ -117,7 +116,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
entry.getValue().add(replica2); entry.getValue().add(replica2);
it.next().getValue().add(replica3); // 2nd server it.next().getValue().add(replica3); // 2nd server
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
costFunction.init(cluster); costFunction.init(cluster);
double costWith3ReplicasSameServer = costFunction.cost(); double costWith3ReplicasSameServer = costFunction.cost();
@ -131,7 +130,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
clusterState.lastEntry().getValue().add(replica2); clusterState.lastEntry().getValue().add(replica2);
clusterState.lastEntry().getValue().add(replica3); clusterState.lastEntry().getValue().add(replica3);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); cluster = new BalancerClusterState(clusterState, null, null, null);
costFunction.init(cluster); costFunction.init(cluster);
double costWith2ReplicasOnTwoServers = costFunction.cost(); double costWith2ReplicasOnTwoServers = costFunction.cost();
@ -152,7 +151,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
regions = randomRegions(1); regions = randomRegions(1);
map.put(s2, regions); map.put(s2, regions);
assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new Cluster(map, null, null, null))); new BalancerClusterState(map, null, null, null)));
// check for the case where there are two hosts on the same rack and there are two racks // check for the case where there are two hosts on the same rack and there are two racks
// and both the replicas are on the same rack // and both the replicas are on the same rack
map.clear(); map.clear();
@ -165,7 +164,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1)); map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
assertTrue( assertTrue(
loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new Cluster(map, null, null, new ForTestRackManagerOne()))); new BalancerClusterState(map, null, null, new ForTestRackManagerOne())));
} }
@Test @Test