HBASE-25793 Move BaseLoadBalancer.Cluster to a separated file (#3185)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
72aa741273
commit
d5c5e48839
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class AssignRegionAction extends BalanceAction {
|
||||
private final int region;
|
||||
private final int server;
|
||||
|
||||
public AssignRegionAction(int region, int server) {
|
||||
super(Type.ASSIGN_REGION);
|
||||
this.region = region;
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
public int getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public int getServer() {
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BalanceAction undoAction() {
|
||||
// TODO implement this. This action is not being used by the StochasticLB for now
|
||||
// in case it uses it, we should implement this function.
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getType() + ": " + region + ":" + server;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An action to move or swap a region
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class BalanceAction {
|
||||
enum Type {
|
||||
ASSIGN_REGION, MOVE_REGION, SWAP_REGIONS, NULL,
|
||||
}
|
||||
|
||||
static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
|
||||
};
|
||||
|
||||
private final Type type;
|
||||
|
||||
BalanceAction(Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an Action which would undo this action
|
||||
*/
|
||||
BalanceAction undoAction() {
|
||||
return this;
|
||||
}
|
||||
|
||||
Type getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return type + ":";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,865 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* An efficient array based implementation similar to ClusterState for keeping the status of the
|
||||
* cluster in terms of region assignment and distribution. LoadBalancers, such as
|
||||
* StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
|
||||
* manipulations are very costly, which is why this class uses mostly indexes and arrays.
|
||||
* <p/>
|
||||
* BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
|
||||
* topology in terms of server names, hostnames and racks.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class BalancerClusterState {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
|
||||
|
||||
ServerName[] servers;
|
||||
// ServerName uniquely identifies a region server. multiple RS can run on the same host
|
||||
String[] hosts;
|
||||
String[] racks;
|
||||
boolean multiServersPerHost = false; // whether or not any host has more than one server
|
||||
|
||||
ArrayList<String> tables;
|
||||
RegionInfo[] regions;
|
||||
Deque<BalancerRegionLoad>[] regionLoads;
|
||||
private RegionHDFSBlockLocationFinder regionFinder;
|
||||
|
||||
int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
|
||||
|
||||
int[] serverIndexToHostIndex; // serverIndex -> host index
|
||||
int[] serverIndexToRackIndex; // serverIndex -> rack index
|
||||
|
||||
int[][] regionsPerServer; // serverIndex -> region list
|
||||
int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
|
||||
int[][] regionsPerHost; // hostIndex -> list of regions
|
||||
int[][] regionsPerRack; // rackIndex -> region list
|
||||
int[][] primariesOfRegionsPerServer; // serverIndex -> sorted list of regions by primary region
|
||||
// index
|
||||
int[][] primariesOfRegionsPerHost; // hostIndex -> sorted list of regions by primary region index
|
||||
int[][] primariesOfRegionsPerRack; // rackIndex -> sorted list of regions by primary region index
|
||||
|
||||
int[][] serversPerHost; // hostIndex -> list of server indexes
|
||||
int[][] serversPerRack; // rackIndex -> list of server indexes
|
||||
int[] regionIndexToServerIndex; // regionIndex -> serverIndex
|
||||
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
|
||||
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
|
||||
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
|
||||
int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
|
||||
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
|
||||
boolean hasRegionReplicas = false; // whether there is regions with replicas
|
||||
|
||||
Integer[] serverIndicesSortedByRegionCount;
|
||||
Integer[] serverIndicesSortedByLocality;
|
||||
|
||||
Map<Address, Integer> serversToIndex;
|
||||
Map<String, Integer> hostsToIndex;
|
||||
Map<String, Integer> racksToIndex;
|
||||
Map<String, Integer> tablesToIndex;
|
||||
Map<RegionInfo, Integer> regionsToIndex;
|
||||
float[] localityPerServer;
|
||||
|
||||
int numServers;
|
||||
int numHosts;
|
||||
int numRacks;
|
||||
int numTables;
|
||||
int numRegions;
|
||||
|
||||
int numMovedRegions = 0; // num moved regions from the initial configuration
|
||||
Map<ServerName, List<RegionInfo>> clusterState;
|
||||
|
||||
private final RackManager rackManager;
|
||||
// Maps region -> rackIndex -> locality of region on rack
|
||||
private float[][] rackLocalities;
|
||||
// Maps localityType -> region -> [server|rack]Index with highest locality
|
||||
private int[][] regionsToMostLocalEntities;
|
||||
|
||||
static class DefaultRackManager extends RackManager {
|
||||
@Override
|
||||
public String getRack(ServerName server) {
|
||||
return UNKNOWN_RACK;
|
||||
}
|
||||
}
|
||||
|
||||
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
|
||||
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
|
||||
RackManager rackManager) {
|
||||
this(null, clusterState, loads, regionFinder, rackManager);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
|
||||
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
|
||||
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
|
||||
if (unassignedRegions == null) {
|
||||
unassignedRegions = Collections.emptyList();
|
||||
}
|
||||
|
||||
serversToIndex = new HashMap<>();
|
||||
hostsToIndex = new HashMap<>();
|
||||
racksToIndex = new HashMap<>();
|
||||
tablesToIndex = new HashMap<>();
|
||||
|
||||
// TODO: We should get the list of tables from master
|
||||
tables = new ArrayList<>();
|
||||
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
|
||||
|
||||
numRegions = 0;
|
||||
|
||||
List<List<Integer>> serversPerHostList = new ArrayList<>();
|
||||
List<List<Integer>> serversPerRackList = new ArrayList<>();
|
||||
this.clusterState = clusterState;
|
||||
this.regionFinder = regionFinder;
|
||||
|
||||
// Use servername and port as there can be dead servers in this list. We want everything with
|
||||
// a matching hostname and port to have the same index.
|
||||
for (ServerName sn : clusterState.keySet()) {
|
||||
if (sn == null) {
|
||||
LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
|
||||
"skipping; unassigned regions?");
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (serversToIndex.get(sn.getAddress()) == null) {
|
||||
serversToIndex.put(sn.getAddress(), numServers++);
|
||||
}
|
||||
if (!hostsToIndex.containsKey(sn.getHostname())) {
|
||||
hostsToIndex.put(sn.getHostname(), numHosts++);
|
||||
serversPerHostList.add(new ArrayList<>(1));
|
||||
}
|
||||
|
||||
int serverIndex = serversToIndex.get(sn.getAddress());
|
||||
int hostIndex = hostsToIndex.get(sn.getHostname());
|
||||
serversPerHostList.get(hostIndex).add(serverIndex);
|
||||
|
||||
String rack = this.rackManager.getRack(sn);
|
||||
if (!racksToIndex.containsKey(rack)) {
|
||||
racksToIndex.put(rack, numRacks++);
|
||||
serversPerRackList.add(new ArrayList<>());
|
||||
}
|
||||
int rackIndex = racksToIndex.get(rack);
|
||||
serversPerRackList.get(rackIndex).add(serverIndex);
|
||||
}
|
||||
|
||||
// Count how many regions there are.
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
numRegions += entry.getValue().size();
|
||||
}
|
||||
numRegions += unassignedRegions.size();
|
||||
|
||||
regionsToIndex = new HashMap<>(numRegions);
|
||||
servers = new ServerName[numServers];
|
||||
serversPerHost = new int[numHosts][];
|
||||
serversPerRack = new int[numRacks][];
|
||||
regions = new RegionInfo[numRegions];
|
||||
regionIndexToServerIndex = new int[numRegions];
|
||||
initialRegionIndexToServerIndex = new int[numRegions];
|
||||
regionIndexToTableIndex = new int[numRegions];
|
||||
regionIndexToPrimaryIndex = new int[numRegions];
|
||||
regionLoads = new Deque[numRegions];
|
||||
|
||||
regionLocations = new int[numRegions][];
|
||||
serverIndicesSortedByRegionCount = new Integer[numServers];
|
||||
serverIndicesSortedByLocality = new Integer[numServers];
|
||||
localityPerServer = new float[numServers];
|
||||
|
||||
serverIndexToHostIndex = new int[numServers];
|
||||
serverIndexToRackIndex = new int[numServers];
|
||||
regionsPerServer = new int[numServers][];
|
||||
serverIndexToRegionsOffset = new int[numServers];
|
||||
regionsPerHost = new int[numHosts][];
|
||||
regionsPerRack = new int[numRacks][];
|
||||
primariesOfRegionsPerServer = new int[numServers][];
|
||||
primariesOfRegionsPerHost = new int[numHosts][];
|
||||
primariesOfRegionsPerRack = new int[numRacks][];
|
||||
|
||||
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
|
||||
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
if (entry.getKey() == null) {
|
||||
LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
|
||||
continue;
|
||||
}
|
||||
int serverIndex = serversToIndex.get(entry.getKey().getAddress());
|
||||
|
||||
// keep the servername if this is the first server name for this hostname
|
||||
// or this servername has the newest startcode.
|
||||
if (servers[serverIndex] == null ||
|
||||
servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
|
||||
servers[serverIndex] = entry.getKey();
|
||||
}
|
||||
|
||||
if (regionsPerServer[serverIndex] != null) {
|
||||
// there is another server with the same hostAndPort in ClusterState.
|
||||
// allocate the array for the total size
|
||||
regionsPerServer[serverIndex] =
|
||||
new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
|
||||
} else {
|
||||
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
|
||||
}
|
||||
primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
|
||||
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
|
||||
serverIndicesSortedByLocality[serverIndex] = serverIndex;
|
||||
}
|
||||
|
||||
hosts = new String[numHosts];
|
||||
for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
|
||||
hosts[entry.getValue()] = entry.getKey();
|
||||
}
|
||||
racks = new String[numRacks];
|
||||
for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
|
||||
racks[entry.getValue()] = entry.getKey();
|
||||
}
|
||||
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
int serverIndex = serversToIndex.get(entry.getKey().getAddress());
|
||||
regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
|
||||
|
||||
int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
|
||||
serverIndexToHostIndex[serverIndex] = hostIndex;
|
||||
|
||||
int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
|
||||
serverIndexToRackIndex[serverIndex] = rackIndex;
|
||||
|
||||
for (RegionInfo region : entry.getValue()) {
|
||||
registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
|
||||
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
|
||||
regionIndex++;
|
||||
}
|
||||
serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
|
||||
}
|
||||
|
||||
for (RegionInfo region : unassignedRegions) {
|
||||
registerRegion(region, regionIndex, -1, loads, regionFinder);
|
||||
regionIndex++;
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerHostList.size(); i++) {
|
||||
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
|
||||
}
|
||||
if (serversPerHost[i].length > 1) {
|
||||
multiServersPerHost = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerRackList.size(); i++) {
|
||||
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
|
||||
}
|
||||
}
|
||||
|
||||
numTables = tables.size();
|
||||
numRegionsPerServerPerTable = new int[numServers][numTables];
|
||||
|
||||
for (int i = 0; i < numServers; i++) {
|
||||
for (int j = 0; j < numTables; j++) {
|
||||
numRegionsPerServerPerTable[i][j] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < regionIndexToServerIndex.length; i++) {
|
||||
if (regionIndexToServerIndex[i] >= 0) {
|
||||
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
||||
}
|
||||
}
|
||||
|
||||
numMaxRegionsPerTable = new int[numTables];
|
||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
|
||||
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
RegionInfo info = regions[i];
|
||||
if (RegionReplicaUtil.isDefaultReplica(info)) {
|
||||
regionIndexToPrimaryIndex[i] = i;
|
||||
} else {
|
||||
hasRegionReplicas = true;
|
||||
RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
|
||||
regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < regionsPerServer.length; i++) {
|
||||
primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
|
||||
for (int j = 0; j < regionsPerServer[i].length; j++) {
|
||||
int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
|
||||
primariesOfRegionsPerServer[i][j] = primaryIndex;
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerServer[i]);
|
||||
}
|
||||
|
||||
// compute regionsPerHost
|
||||
if (multiServersPerHost) {
|
||||
for (int i = 0; i < serversPerHost.length; i++) {
|
||||
int numRegionsPerHost = 0;
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
|
||||
}
|
||||
regionsPerHost[i] = new int[numRegionsPerHost];
|
||||
primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
|
||||
}
|
||||
for (int i = 0; i < serversPerHost.length; i++) {
|
||||
int numRegionPerHostIndex = 0;
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
|
||||
int region = regionsPerServer[serversPerHost[i][j]][k];
|
||||
regionsPerHost[i][numRegionPerHostIndex] = region;
|
||||
int primaryIndex = regionIndexToPrimaryIndex[region];
|
||||
primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
|
||||
numRegionPerHostIndex++;
|
||||
}
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerHost[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// compute regionsPerRack
|
||||
if (numRacks > 1) {
|
||||
for (int i = 0; i < serversPerRack.length; i++) {
|
||||
int numRegionsPerRack = 0;
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
|
||||
}
|
||||
regionsPerRack[i] = new int[numRegionsPerRack];
|
||||
primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerRack.length; i++) {
|
||||
int numRegionPerRackIndex = 0;
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
|
||||
int region = regionsPerServer[serversPerRack[i][j]][k];
|
||||
regionsPerRack[i][numRegionPerRackIndex] = region;
|
||||
int primaryIndex = regionIndexToPrimaryIndex[region];
|
||||
primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
|
||||
numRegionPerRackIndex++;
|
||||
}
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerRack[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper for Cluster constructor to handle a region */
|
||||
private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
|
||||
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
|
||||
String tableName = region.getTable().getNameAsString();
|
||||
if (!tablesToIndex.containsKey(tableName)) {
|
||||
tables.add(tableName);
|
||||
tablesToIndex.put(tableName, tablesToIndex.size());
|
||||
}
|
||||
int tableIndex = tablesToIndex.get(tableName);
|
||||
|
||||
regionsToIndex.put(region, regionIndex);
|
||||
regions[regionIndex] = region;
|
||||
regionIndexToServerIndex[regionIndex] = serverIndex;
|
||||
initialRegionIndexToServerIndex[regionIndex] = serverIndex;
|
||||
regionIndexToTableIndex[regionIndex] = tableIndex;
|
||||
|
||||
// region load
|
||||
if (loads != null) {
|
||||
Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
|
||||
// That could have failed if the RegionLoad is using the other regionName
|
||||
if (rl == null) {
|
||||
// Try getting the region load using encoded name.
|
||||
rl = loads.get(region.getEncodedName());
|
||||
}
|
||||
regionLoads[regionIndex] = rl;
|
||||
}
|
||||
|
||||
if (regionFinder != null) {
|
||||
// region location
|
||||
List<ServerName> loc = regionFinder.getTopBlockLocations(region);
|
||||
regionLocations[regionIndex] = new int[loc.size()];
|
||||
for (int i = 0; i < loc.size(); i++) {
|
||||
regionLocations[regionIndex][i] = loc.get(i) == null ? -1 :
|
||||
(serversToIndex.get(loc.get(i).getAddress()) == null ? -1 :
|
||||
serversToIndex.get(loc.get(i).getAddress()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff a given server has less regions than the balanced amount
|
||||
*/
|
||||
public boolean serverHasTooFewRegions(int server) {
|
||||
int minLoad = this.numRegions / numServers;
|
||||
int numRegions = getNumRegions(server);
|
||||
return numRegions < minLoad;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves and lazily initializes a field storing the locality of every region/server
|
||||
* combination
|
||||
*/
|
||||
public float[][] getOrComputeRackLocalities() {
|
||||
if (rackLocalities == null || regionsToMostLocalEntities == null) {
|
||||
computeCachedLocalities();
|
||||
}
|
||||
return rackLocalities;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazily initializes and retrieves a mapping of region -> server for which region has the highest
|
||||
* the locality
|
||||
*/
|
||||
public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
|
||||
if (rackLocalities == null || regionsToMostLocalEntities == null) {
|
||||
computeCachedLocalities();
|
||||
}
|
||||
return regionsToMostLocalEntities[type.ordinal()];
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up locality from cache of localities. Will create cache if it does not already exist.
|
||||
*/
|
||||
public float getOrComputeLocality(int region, int entity,
|
||||
BalancerClusterState.LocalityType type) {
|
||||
switch (type) {
|
||||
case SERVER:
|
||||
return getLocalityOfRegion(region, entity);
|
||||
case RACK:
|
||||
return getOrComputeRackLocalities()[region][entity];
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported LocalityType: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns locality weighted by region size in MB. Will create locality cache if it does not
|
||||
* already exist.
|
||||
*/
|
||||
public double getOrComputeWeightedLocality(int region, int server,
|
||||
BalancerClusterState.LocalityType type) {
|
||||
return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in MB from the most recent RegionLoad for region
|
||||
*/
|
||||
public int getRegionSizeMB(int region) {
|
||||
Deque<BalancerRegionLoad> load = regionLoads[region];
|
||||
// This means regions have no actual data on disk
|
||||
if (load == null) {
|
||||
return 0;
|
||||
}
|
||||
return regionLoads[region].getLast().getStorefileSizeMB();
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes and caches the locality for each region/rack combinations, as well as storing a
|
||||
* mapping of region -> server and region -> rack such that server and rack have the highest
|
||||
* locality for region
|
||||
*/
|
||||
private void computeCachedLocalities() {
|
||||
rackLocalities = new float[numRegions][numRacks];
|
||||
regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
|
||||
|
||||
// Compute localities and find most local server per region
|
||||
for (int region = 0; region < numRegions; region++) {
|
||||
int serverWithBestLocality = 0;
|
||||
float bestLocalityForRegion = 0;
|
||||
for (int server = 0; server < numServers; server++) {
|
||||
// Aggregate per-rack locality
|
||||
float locality = getLocalityOfRegion(region, server);
|
||||
int rack = serverIndexToRackIndex[server];
|
||||
int numServersInRack = serversPerRack[rack].length;
|
||||
rackLocalities[region][rack] += locality / numServersInRack;
|
||||
|
||||
if (locality > bestLocalityForRegion) {
|
||||
serverWithBestLocality = server;
|
||||
bestLocalityForRegion = locality;
|
||||
}
|
||||
}
|
||||
regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
|
||||
|
||||
// Find most local rack per region
|
||||
int rackWithBestLocality = 0;
|
||||
float bestRackLocalityForRegion = 0.0f;
|
||||
for (int rack = 0; rack < numRacks; rack++) {
|
||||
float rackLocality = rackLocalities[region][rack];
|
||||
if (rackLocality > bestRackLocalityForRegion) {
|
||||
bestRackLocalityForRegion = rackLocality;
|
||||
rackWithBestLocality = rack;
|
||||
}
|
||||
}
|
||||
regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps region index to rack index
|
||||
*/
|
||||
public int getRackForRegion(int region) {
|
||||
return serverIndexToRackIndex[regionIndexToServerIndex[region]];
|
||||
}
|
||||
|
||||
enum LocalityType {
|
||||
SERVER, RACK
|
||||
}
|
||||
|
||||
public void doAction(BalanceAction action) {
|
||||
switch (action.getType()) {
|
||||
case NULL:
|
||||
break;
|
||||
case ASSIGN_REGION:
|
||||
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
|
||||
assert action instanceof AssignRegionAction : action.getClass();
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionsPerServer[ar.getServer()] =
|
||||
addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
|
||||
regionMoved(ar.getRegion(), -1, ar.getServer());
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
assert action instanceof MoveRegionAction : action.getClass();
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionsPerServer[mra.getFromServer()] =
|
||||
removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
|
||||
regionsPerServer[mra.getToServer()] =
|
||||
addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
|
||||
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
assert action instanceof SwapRegionsAction : action.getClass();
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionsPerServer[a.getFromServer()] =
|
||||
replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
|
||||
regionsPerServer[a.getToServer()] =
|
||||
replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
|
||||
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
|
||||
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Uknown action:" + action.getType());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the placement of region on server would lower the availability of the region in
|
||||
* question
|
||||
* @return true or false
|
||||
*/
|
||||
boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
|
||||
if (!serversToIndex.containsKey(serverName.getAddress())) {
|
||||
return false; // safeguard against race between cluster.servers and servers from LB method
|
||||
// args
|
||||
}
|
||||
int server = serversToIndex.get(serverName.getAddress());
|
||||
int region = regionsToIndex.get(regionInfo);
|
||||
|
||||
// Region replicas for same region should better assign to different servers
|
||||
for (int i : regionsPerServer[server]) {
|
||||
RegionInfo otherRegionInfo = regions[i];
|
||||
if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int primary = regionIndexToPrimaryIndex[region];
|
||||
if (primary == -1) {
|
||||
return false;
|
||||
}
|
||||
// there is a subset relation for server < host < rack
|
||||
// check server first
|
||||
if (contains(primariesOfRegionsPerServer[server], primary)) {
|
||||
// check for whether there are other servers that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
|
||||
if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
|
||||
return true; // meaning there is a better server
|
||||
}
|
||||
}
|
||||
return false; // there is not a better server to place this
|
||||
}
|
||||
|
||||
// check host
|
||||
if (multiServersPerHost) {
|
||||
// these arrays would only be allocated if we have more than one server per host
|
||||
int host = serverIndexToHostIndex[server];
|
||||
if (contains(primariesOfRegionsPerHost[host], primary)) {
|
||||
// check for whether there are other hosts that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
|
||||
if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
|
||||
return true; // meaning there is a better host
|
||||
}
|
||||
}
|
||||
return false; // there is not a better host to place this
|
||||
}
|
||||
}
|
||||
|
||||
// check rack
|
||||
if (numRacks > 1) {
|
||||
int rack = serverIndexToRackIndex[server];
|
||||
if (contains(primariesOfRegionsPerRack[rack], primary)) {
|
||||
// check for whether there are other racks that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
|
||||
if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
|
||||
return true; // meaning there is a better rack
|
||||
}
|
||||
}
|
||||
return false; // there is not a better rack to place this
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
|
||||
if (!serversToIndex.containsKey(serverName.getAddress())) {
|
||||
return;
|
||||
}
|
||||
int server = serversToIndex.get(serverName.getAddress());
|
||||
int region = regionsToIndex.get(regionInfo);
|
||||
doAction(new AssignRegionAction(region, server));
|
||||
}
|
||||
|
||||
void regionMoved(int region, int oldServer, int newServer) {
|
||||
regionIndexToServerIndex[region] = newServer;
|
||||
if (initialRegionIndexToServerIndex[region] == newServer) {
|
||||
numMovedRegions--; // region moved back to original location
|
||||
} else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
|
||||
numMovedRegions++; // region moved from original location
|
||||
}
|
||||
int tableIndex = regionIndexToTableIndex[region];
|
||||
if (oldServer >= 0) {
|
||||
numRegionsPerServerPerTable[oldServer][tableIndex]--;
|
||||
}
|
||||
numRegionsPerServerPerTable[newServer][tableIndex]++;
|
||||
|
||||
// check whether this caused maxRegionsPerTable in the new Server to be updated
|
||||
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
|
||||
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex]
|
||||
+ 1) == numMaxRegionsPerTable[tableIndex]) {
|
||||
// recompute maxRegionsPerTable since the previous value was coming from the old server
|
||||
numMaxRegionsPerTable[tableIndex] = 0;
|
||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update for servers
|
||||
int primary = regionIndexToPrimaryIndex[region];
|
||||
if (oldServer >= 0) {
|
||||
primariesOfRegionsPerServer[oldServer] =
|
||||
removeRegion(primariesOfRegionsPerServer[oldServer], primary);
|
||||
}
|
||||
primariesOfRegionsPerServer[newServer] =
|
||||
addRegionSorted(primariesOfRegionsPerServer[newServer], primary);
|
||||
|
||||
// update for hosts
|
||||
if (multiServersPerHost) {
|
||||
int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
|
||||
int newHost = serverIndexToHostIndex[newServer];
|
||||
if (newHost != oldHost) {
|
||||
regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
|
||||
primariesOfRegionsPerHost[newHost] =
|
||||
addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
|
||||
if (oldHost >= 0) {
|
||||
regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
|
||||
primariesOfRegionsPerHost[oldHost] =
|
||||
removeRegion(primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update for racks
|
||||
if (numRacks > 1) {
|
||||
int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
|
||||
int newRack = serverIndexToRackIndex[newServer];
|
||||
if (newRack != oldRack) {
|
||||
regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
|
||||
primariesOfRegionsPerRack[newRack] =
|
||||
addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
|
||||
if (oldRack >= 0) {
|
||||
regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
|
||||
primariesOfRegionsPerRack[oldRack] =
|
||||
removeRegion(primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int[] removeRegion(int[] regions, int regionIndex) {
|
||||
// TODO: this maybe costly. Consider using linked lists
|
||||
int[] newRegions = new int[regions.length - 1];
|
||||
int i = 0;
|
||||
for (i = 0; i < regions.length; i++) {
|
||||
if (regions[i] == regionIndex) {
|
||||
break;
|
||||
}
|
||||
newRegions[i] = regions[i];
|
||||
}
|
||||
System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
int[] addRegion(int[] regions, int regionIndex) {
|
||||
int[] newRegions = new int[regions.length + 1];
|
||||
System.arraycopy(regions, 0, newRegions, 0, regions.length);
|
||||
newRegions[newRegions.length - 1] = regionIndex;
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
int[] addRegionSorted(int[] regions, int regionIndex) {
|
||||
int[] newRegions = new int[regions.length + 1];
|
||||
int i = 0;
|
||||
for (i = 0; i < regions.length; i++) { // find the index to insert
|
||||
if (regions[i] > regionIndex) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
|
||||
System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
|
||||
newRegions[i] = regionIndex;
|
||||
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
|
||||
int i = 0;
|
||||
for (i = 0; i < regions.length; i++) {
|
||||
if (regions[i] == regionIndex) {
|
||||
regions[i] = newRegionIndex;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
void sortServersByRegionCount() {
|
||||
Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
|
||||
}
|
||||
|
||||
int getNumRegions(int server) {
|
||||
return regionsPerServer[server].length;
|
||||
}
|
||||
|
||||
boolean contains(int[] arr, int val) {
|
||||
return Arrays.binarySearch(arr, val) >= 0;
|
||||
}
|
||||
|
||||
private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
|
||||
|
||||
int getLowestLocalityRegionOnServer(int serverIndex) {
|
||||
if (regionFinder != null) {
|
||||
float lowestLocality = 1.0f;
|
||||
int lowestLocalityRegionIndex = -1;
|
||||
if (regionsPerServer[serverIndex].length == 0) {
|
||||
// No regions on that region server
|
||||
return -1;
|
||||
}
|
||||
for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
|
||||
int regionIndex = regionsPerServer[serverIndex][j];
|
||||
HDFSBlocksDistribution distribution =
|
||||
regionFinder.getBlockDistribution(regions[regionIndex]);
|
||||
float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
|
||||
// skip empty region
|
||||
if (distribution.getUniqueBlocksTotalWeight() == 0) {
|
||||
continue;
|
||||
}
|
||||
if (locality < lowestLocality) {
|
||||
lowestLocality = locality;
|
||||
lowestLocalityRegionIndex = j;
|
||||
}
|
||||
}
|
||||
if (lowestLocalityRegionIndex == -1) {
|
||||
return -1;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Lowest locality region is " +
|
||||
regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
|
||||
.getRegionNameAsString() +
|
||||
" with locality " + lowestLocality + " and its region server contains " +
|
||||
regionsPerServer[serverIndex].length + " regions");
|
||||
}
|
||||
return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
float getLocalityOfRegion(int region, int server) {
|
||||
if (regionFinder != null) {
|
||||
HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
|
||||
return distribution.getBlockLocalityIndex(servers[server].getHostname());
|
||||
} else {
|
||||
return 0f;
|
||||
}
|
||||
}
|
||||
|
||||
void setNumRegions(int numRegions) {
|
||||
this.numRegions = numRegions;
|
||||
}
|
||||
|
||||
void setNumMovedRegions(int numMovedRegions) {
|
||||
this.numMovedRegions = numMovedRegions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder desc = new StringBuilder("Cluster={servers=[");
|
||||
for (ServerName sn : servers) {
|
||||
desc.append(sn.getAddress().toString()).append(", ");
|
||||
}
|
||||
desc.append("], serverIndicesSortedByRegionCount=")
|
||||
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
|
||||
.append(Arrays.deepToString(regionsPerServer));
|
||||
|
||||
desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
|
||||
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
|
||||
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
|
||||
.append('}');
|
||||
return desc.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class MoveRegionAction extends BalanceAction {
|
||||
private final int region;
|
||||
private final int fromServer;
|
||||
private final int toServer;
|
||||
|
||||
public MoveRegionAction(int region, int fromServer, int toServer) {
|
||||
super(Type.MOVE_REGION);
|
||||
this.fromServer = fromServer;
|
||||
this.region = region;
|
||||
this.toServer = toServer;
|
||||
}
|
||||
|
||||
public int getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public int getFromServer() {
|
||||
return fromServer;
|
||||
}
|
||||
|
||||
public int getToServer() {
|
||||
return toServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BalanceAction undoAction() {
|
||||
return new MoveRegionAction(region, toServer, fromServer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getType() + ": " + region + ":" + fromServer + " -> " + toServer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class SwapRegionsAction extends BalanceAction {
|
||||
private final int fromServer;
|
||||
private final int fromRegion;
|
||||
private final int toServer;
|
||||
private final int toRegion;
|
||||
|
||||
SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
|
||||
super(Type.SWAP_REGIONS);
|
||||
this.fromServer = fromServer;
|
||||
this.fromRegion = fromRegion;
|
||||
this.toServer = toServer;
|
||||
this.toRegion = toRegion;
|
||||
}
|
||||
|
||||
public int getFromServer() {
|
||||
return fromServer;
|
||||
}
|
||||
|
||||
public int getFromRegion() {
|
||||
return fromRegion;
|
||||
}
|
||||
|
||||
public int getToServer() {
|
||||
return toServer;
|
||||
}
|
||||
|
||||
public int getToRegion() {
|
||||
return toRegion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BalanceAction undoAction() {
|
||||
return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getType() + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
abstract class CandidateGenerator {
|
||||
|
||||
abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster);
|
||||
abstract BalanceAction generate(BalancerClusterState cluster);
|
||||
|
||||
/**
|
||||
* From a list of regions pick a random one. Null can be returned which
|
||||
|
@ -42,7 +42,7 @@ abstract class CandidateGenerator {
|
|||
* @return a random {@link RegionInfo} or null if an asymmetrical move is
|
||||
* suggested.
|
||||
*/
|
||||
int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server,
|
||||
int pickRandomRegion(BalancerClusterState cluster, int server,
|
||||
double chanceOfNoSwap) {
|
||||
// Check to see if this is just a move.
|
||||
if (cluster.regionsPerServer[server].length == 0
|
||||
|
@ -54,7 +54,7 @@ abstract class CandidateGenerator {
|
|||
return cluster.regionsPerServer[server][rand];
|
||||
}
|
||||
|
||||
int pickRandomServer(BaseLoadBalancer.Cluster cluster) {
|
||||
int pickRandomServer(BalancerClusterState cluster) {
|
||||
if (cluster.numServers < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ abstract class CandidateGenerator {
|
|||
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
|
||||
}
|
||||
|
||||
int pickRandomRack(BaseLoadBalancer.Cluster cluster) {
|
||||
int pickRandomRack(BalancerClusterState cluster) {
|
||||
if (cluster.numRacks < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ abstract class CandidateGenerator {
|
|||
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
|
||||
}
|
||||
|
||||
int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) {
|
||||
int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) {
|
||||
if (cluster.numServers < 2) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ abstract class CandidateGenerator {
|
|||
}
|
||||
}
|
||||
|
||||
int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) {
|
||||
int pickOtherRandomRack(BalancerClusterState cluster, int rackIndex) {
|
||||
if (cluster.numRacks < 2) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -94,10 +94,10 @@ abstract class CandidateGenerator {
|
|||
}
|
||||
}
|
||||
|
||||
BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster,
|
||||
BalanceAction pickRandomRegions(BalancerClusterState cluster,
|
||||
int thisServer, int otherServer) {
|
||||
if (thisServer < 0 || otherServer < 0) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
// Decide who is most likely to need another region
|
||||
|
@ -114,20 +114,20 @@ abstract class CandidateGenerator {
|
|||
return getAction(thisServer, thisRegion, otherServer, otherRegion);
|
||||
}
|
||||
|
||||
protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion,
|
||||
protected BalanceAction getAction(int fromServer, int fromRegion,
|
||||
int toServer, int toRegion) {
|
||||
if (fromServer < 0 || toServer < 0) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
if (fromRegion >= 0 && toRegion >= 0) {
|
||||
return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion,
|
||||
return new SwapRegionsAction(fromServer, fromRegion,
|
||||
toServer, toRegion);
|
||||
} else if (fromRegion >= 0) {
|
||||
return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
|
||||
return new MoveRegionAction(fromRegion, fromServer, toServer);
|
||||
} else if (toRegion >= 0) {
|
||||
return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer);
|
||||
return new MoveRegionAction(toRegion, toServer, fromServer);
|
||||
} else {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -525,13 +525,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
private class FavoredNodeLocalityPicker extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
protected Cluster.Action generate(Cluster cluster) {
|
||||
protected BalanceAction generate(BalancerClusterState cluster) {
|
||||
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
int thisRegion;
|
||||
if (thisServer == -1) {
|
||||
LOG.trace("Could not pick lowest local region server");
|
||||
return Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
} else {
|
||||
// Pick lowest local region on this server
|
||||
thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
|
||||
|
@ -541,7 +541,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
LOG.trace("Could not pick lowest local region even when region server held "
|
||||
+ cluster.regionsPerServer[thisServer].length + " regions");
|
||||
}
|
||||
return Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
RegionInfo hri = cluster.regions[thisRegion];
|
||||
|
@ -553,7 +553,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
} else {
|
||||
// No FN, ignore
|
||||
LOG.trace("Ignoring, no favored nodes for region: " + hri);
|
||||
return Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
} else {
|
||||
// Pick other favored node with the highest locality
|
||||
|
@ -562,7 +562,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return getAction(thisServer, thisRegion, otherServer, -1);
|
||||
}
|
||||
|
||||
private int getDifferentFavoredNode(Cluster cluster, List<ServerName> favoredNodes,
|
||||
private int getDifferentFavoredNode(BalancerClusterState cluster, List<ServerName> favoredNodes,
|
||||
int currentServer) {
|
||||
List<Integer> fnIndex = new ArrayList<>();
|
||||
for (ServerName sn : favoredNodes) {
|
||||
|
@ -584,7 +584,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return highestLocalRSIndex;
|
||||
}
|
||||
|
||||
private int pickLowestLocalRegionOnServer(Cluster cluster, int server) {
|
||||
private int pickLowestLocalRegionOnServer(BalancerClusterState cluster, int server) {
|
||||
return cluster.getLowestLocalityRegionOnServer(server);
|
||||
}
|
||||
}
|
||||
|
@ -596,7 +596,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
class FavoredNodeLoadPicker extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
cluster.sortServersByRegionCount();
|
||||
int thisServer = pickMostLoadedServer(cluster);
|
||||
int thisRegion = pickRandomRegion(cluster, thisServer, 0);
|
||||
|
@ -607,7 +607,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
|
||||
otherServer = pickLeastLoadedServer(cluster, thisServer);
|
||||
} else {
|
||||
return Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
} else {
|
||||
otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
|
||||
|
@ -615,7 +615,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return getAction(thisServer, thisRegion, otherServer, -1);
|
||||
}
|
||||
|
||||
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
|
||||
private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
int index;
|
||||
for (index = 0; index < servers.length ; index++) {
|
||||
|
@ -626,8 +626,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return servers[index];
|
||||
}
|
||||
|
||||
private int pickLeastLoadedFNServer(final Cluster cluster, List<ServerName> favoredNodes,
|
||||
int currentServerIndex) {
|
||||
private int pickLeastLoadedFNServer(final BalancerClusterState cluster,
|
||||
List<ServerName> favoredNodes, int currentServerIndex) {
|
||||
List<Integer> fnIndex = new ArrayList<>();
|
||||
for (ServerName sn : favoredNodes) {
|
||||
if (cluster.serversToIndex.containsKey(sn.getAddress())) {
|
||||
|
@ -648,7 +648,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return leastLoadedFN;
|
||||
}
|
||||
|
||||
private int pickMostLoadedServer(final Cluster cluster) {
|
||||
private int pickMostLoadedServer(final BalancerClusterState cluster) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
int index;
|
||||
for (index = servers.length - 1; index > 0 ; index--) {
|
||||
|
|
|
@ -120,7 +120,7 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
|
|||
* any costly calculation.
|
||||
*/
|
||||
@Override
|
||||
void init(final BaseLoadBalancer.Cluster cluster) {
|
||||
void init(final BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
this.loadRules();
|
||||
}
|
||||
|
|
|
@ -24,14 +24,14 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
class LoadCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
cluster.sortServersByRegionCount();
|
||||
int thisServer = pickMostLoadedServer(cluster, -1);
|
||||
int otherServer = pickLeastLoadedServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = 0;
|
||||
|
@ -44,7 +44,7 @@ class LoadCandidateGenerator extends CandidateGenerator {
|
|||
return servers[index];
|
||||
}
|
||||
|
||||
private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = servers.length - 1;
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
// iterate through regions until you find one that is not on ideal host
|
||||
// start from a random point to avoid always balance the regions in front
|
||||
if (cluster.numRegions > 0) {
|
||||
|
@ -35,20 +35,20 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
|||
int region = (startIndex + i) % cluster.numRegions;
|
||||
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
|
||||
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
|
||||
BalancerClusterState.LocalityType.SERVER)[region]) {
|
||||
Optional<BalanceAction> potential = tryMoveOrSwap(cluster,
|
||||
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]);
|
||||
BalancerClusterState.LocalityType.SERVER)[region]);
|
||||
if (potential.isPresent()) {
|
||||
return potential.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer.Cluster cluster,
|
||||
private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster,
|
||||
int fromServer, int fromRegion, int toServer) {
|
||||
// Try move first. We know apriori fromRegion has the highest locality on toServer
|
||||
if (cluster.serverHasTooFewRegions(toServer)) {
|
||||
|
@ -74,8 +74,8 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
|||
return Optional.empty();
|
||||
}
|
||||
|
||||
private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
|
||||
private double getWeightedLocality(BalancerClusterState cluster, int region, int server) {
|
||||
return cluster.getOrComputeWeightedLocality(region, server,
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER);
|
||||
BalancerClusterState.LocalityType.SERVER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,10 +83,10 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
|||
}
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
int serverIndex = pickRandomServer(cluster);
|
||||
if (cluster.numServers <= 1 || serverIndex == -1) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
int regionIndex = selectCoHostedRegionPerGroup(
|
||||
|
|
|
@ -256,7 +256,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
// construct a Cluster object with clusterMap and rest of the
|
||||
// argument as defaults
|
||||
Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager);
|
||||
BalancerClusterState c =
|
||||
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
|
||||
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -39,12 +39,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
|
||||
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
|
||||
|
@ -309,7 +304,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
|
||||
protected synchronized boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
|
||||
regionReplicaHostCostFunction.init(c);
|
||||
if (regionReplicaHostCostFunction.cost() > 0) {
|
||||
return true;
|
||||
|
@ -322,7 +317,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsBalance(TableName tableName, Cluster cluster) {
|
||||
protected boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
|
||||
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
|
||||
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -369,7 +364,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return !balanced;
|
||||
}
|
||||
|
||||
Cluster.Action nextAction(Cluster cluster) {
|
||||
BalanceAction nextAction(BalancerClusterState cluster) {
|
||||
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
|
||||
.generate(cluster);
|
||||
}
|
||||
|
@ -394,7 +389,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
//The clusterState that is given to this method contains the state
|
||||
//of all the regions in the table(s) (that's true today)
|
||||
// Keep track of servers to iterate through them.
|
||||
Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
|
||||
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
|
@ -434,9 +430,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
long step;
|
||||
|
||||
for (step = 0; step < computedMaxSteps; step++) {
|
||||
Cluster.Action action = nextAction(cluster);
|
||||
BalanceAction action = nextAction(cluster);
|
||||
|
||||
if (action.type == Type.NULL) {
|
||||
if (action.getType() == BalanceAction.Type.NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -455,7 +451,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
} else {
|
||||
// Put things back the way they were before.
|
||||
// TODO: undo by remembering old values
|
||||
Action undoAction = action.undoAction();
|
||||
BalanceAction undoAction = action.undoAction();
|
||||
cluster.doAction(undoAction);
|
||||
updateCostsWithAction(cluster, undoAction);
|
||||
}
|
||||
|
@ -576,7 +572,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* @param cluster The state of the cluster
|
||||
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
|
||||
*/
|
||||
private List<RegionPlan> createRegionPlans(Cluster cluster) {
|
||||
private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
|
||||
List<RegionPlan> plans = new LinkedList<>();
|
||||
for (int regionIndex = 0;
|
||||
regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
|
||||
|
@ -627,13 +623,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
protected void initCosts(Cluster cluster) {
|
||||
protected void initCosts(BalancerClusterState cluster) {
|
||||
for (CostFunction c:costFunctions) {
|
||||
c.init(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
protected void updateCostsWithAction(Cluster cluster, Action action) {
|
||||
protected void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
for (CostFunction c : costFunctions) {
|
||||
c.postAction(action);
|
||||
}
|
||||
|
@ -662,7 +658,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* @return a double of a cost associated with the proposed cluster state. This cost is an
|
||||
* aggregate of all individual cost functions.
|
||||
*/
|
||||
protected double computeCost(Cluster cluster, double previousCost) {
|
||||
protected double computeCost(BalancerClusterState cluster, double previousCost) {
|
||||
double total = 0;
|
||||
|
||||
for (int i = 0; i < costFunctions.size(); i++) {
|
||||
|
@ -690,7 +686,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
static class RandomCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
|
||||
|
@ -707,7 +703,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
*/
|
||||
static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
int rackIndex = pickRandomRack(cluster);
|
||||
if (cluster.numRacks <= 1 || rackIndex == -1) {
|
||||
return super.generate(cluster);
|
||||
|
@ -741,7 +737,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
private float multiplier = 0;
|
||||
|
||||
protected Cluster cluster;
|
||||
protected BalancerClusterState cluster;
|
||||
|
||||
public CostFunction(Configuration c) {
|
||||
}
|
||||
|
@ -760,7 +756,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
/** Called once per LB invocation to give the cost function
|
||||
* to initialize it's state, and perform any costly calculation.
|
||||
*/
|
||||
void init(Cluster cluster) {
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
|
@ -768,24 +764,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* an opportunity to update it's state. postAction() is always
|
||||
* called at least once before cost() is called with the cluster
|
||||
* that this action is performed on. */
|
||||
void postAction(Action action) {
|
||||
switch (action.type) {
|
||||
case NULL: break;
|
||||
case ASSIGN_REGION:
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionMoved(ar.region, -1, ar.server);
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionMoved(a.fromRegion, a.fromServer, a.toServer);
|
||||
regionMoved(a.toRegion, a.toServer, a.fromServer);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Uknown action:" + action.type);
|
||||
void postAction(BalanceAction action) {
|
||||
switch (action.getType()) {
|
||||
case NULL:
|
||||
break;
|
||||
case ASSIGN_REGION:
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionMoved(ar.getRegion(), -1, ar.getServer());
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
|
||||
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Uknown action:" + action.getType());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -936,7 +933,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
void init(Cluster cluster) {
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
|
||||
cluster.numServers, cluster.numRegions);
|
||||
|
@ -1060,7 +1057,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
abstract int regionIndexToEntityIndex(int region);
|
||||
|
||||
@Override
|
||||
void init(Cluster cluster) {
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
locality = 0.0;
|
||||
bestLocality = 0.0;
|
||||
|
@ -1310,7 +1307,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
void init(Cluster cluster) {
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
|
||||
|
@ -1323,7 +1320,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
long getMaxCost(Cluster cluster) {
|
||||
long getMaxCost(BalancerClusterState cluster) {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0; // short circuit
|
||||
}
|
||||
|
@ -1422,7 +1419,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
void init(Cluster cluster) {
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
if (cluster.numRacks <= 1) {
|
||||
maxCost = 0;
|
||||
|
|
|
@ -376,8 +376,8 @@ public class BalancerTestBase {
|
|||
return mockClusterServers(mockCluster, -1);
|
||||
}
|
||||
|
||||
protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) {
|
||||
return new BaseLoadBalancer.Cluster(
|
||||
protected BalancerClusterState mockCluster(int[] mockCluster) {
|
||||
return new BalancerClusterState(
|
||||
mockClusterServers(mockCluster, -1), null, null, null);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -301,7 +299,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
// cluster is created (constructor code) would make sure the indices of
|
||||
// the servers are in the order in which it is inserted in the clusterState
|
||||
// map (linkedhashmap is important). A similar thing applies to the region lists
|
||||
Cluster cluster = new Cluster(clusterState, null, null, rackManager);
|
||||
BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager);
|
||||
// check whether a move of region1 from servers[0] to servers[1] would lower
|
||||
// the availability of region1
|
||||
assertTrue(cluster.wouldLowerAvailability(hri1, servers[1]));
|
||||
|
@ -318,7 +316,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
// now lets have servers[1] host replica_of_region2
|
||||
list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1));
|
||||
// create a new clusterState with the above change
|
||||
cluster = new Cluster(clusterState, null, null, rackManager);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, rackManager);
|
||||
// now check whether a move of a replica from servers[0] to servers[1] would lower
|
||||
// the availability of region2
|
||||
assertTrue(cluster.wouldLowerAvailability(hri3, servers[1]));
|
||||
|
@ -330,14 +328,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
|
||||
clusterState.put(servers[10], new ArrayList<>()); //servers[10], rack3 hosts no region
|
||||
// create a cluster with the above clusterState
|
||||
cluster = new Cluster(clusterState, null, null, rackManager);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, rackManager);
|
||||
// check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would
|
||||
// lower the availability
|
||||
|
||||
assertTrue(cluster.wouldLowerAvailability(hri1, servers[0]));
|
||||
|
||||
// now create a cluster without the rack manager
|
||||
cluster = new Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
// now repeat check whether a move of region1 from servers[0] to servers[6] would
|
||||
// lower the availability
|
||||
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6]));
|
||||
|
@ -375,7 +373,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
// cluster is created (constructor code) would make sure the indices of
|
||||
// the servers are in the order in which it is inserted in the clusterState
|
||||
// map (linkedhashmap is important).
|
||||
Cluster cluster = new Cluster(clusterState, null, null, rackManager);
|
||||
BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager);
|
||||
// check whether moving region1 from servers[1] to servers[2] would lower availability
|
||||
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2]));
|
||||
|
||||
|
@ -397,7 +395,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
|
||||
clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2
|
||||
// create a cluster with the above clusterState
|
||||
cluster = new Cluster(clusterState, null, null, rackManager);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, rackManager);
|
||||
// check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would
|
||||
// lower the availability
|
||||
assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0]));
|
||||
|
@ -479,7 +477,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
assignRegions(regions, oldServers, clusterState);
|
||||
|
||||
// should not throw exception:
|
||||
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, null, null);
|
||||
BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
assertEquals(101 + 9, cluster.numRegions);
|
||||
assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port
|
||||
|
||||
|
@ -525,12 +523,15 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
Lists.newArrayList(servers.get(0), servers.get(1)));
|
||||
when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
|
||||
Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
|
||||
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
|
||||
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
|
||||
// this server does not exists in clusterStatus
|
||||
when(locationFinder.getTopBlockLocations(regions.get(43)))
|
||||
.thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
|
||||
|
||||
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(clusterState, null, locationFinder, null);
|
||||
|
||||
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test
|
||||
// this is ok, it is just a test
|
||||
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0));
|
||||
int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1));
|
||||
int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10));
|
||||
int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42));
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
|
@ -184,9 +183,10 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
|
|||
regionFinder.setConf(conf);
|
||||
regionFinder.setClusterInfoProvider(
|
||||
new MasterClusterInfoProvider(TEST_UTIL.getMiniHBaseCluster().getMaster()));
|
||||
Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf));
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(serverAssignments, null, regionFinder, new RackManager(conf));
|
||||
LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL
|
||||
.getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer();
|
||||
.getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer();
|
||||
|
||||
cluster.sortServersByRegionCount();
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
@ -204,13 +204,13 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
|
|||
if (userRegionPicked) {
|
||||
break;
|
||||
} else {
|
||||
Cluster.Action action = loadPicker.generate(cluster);
|
||||
if (action.type == Cluster.Action.Type.MOVE_REGION) {
|
||||
Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action;
|
||||
RegionInfo region = cluster.regions[moveRegionAction.region];
|
||||
assertNotEquals(-1, moveRegionAction.toServer);
|
||||
ServerName destinationServer = cluster.servers[moveRegionAction.toServer];
|
||||
assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer);
|
||||
BalanceAction action = loadPicker.generate(cluster);
|
||||
if (action.getType() == BalanceAction.Type.MOVE_REGION) {
|
||||
MoveRegionAction moveRegionAction = (MoveRegionAction) action;
|
||||
RegionInfo region = cluster.regions[moveRegionAction.getRegion()];
|
||||
assertNotEquals(-1, moveRegionAction.getToServer());
|
||||
ServerName destinationServer = cluster.servers[moveRegionAction.getToServer()];
|
||||
assertEquals(cluster.servers[moveRegionAction.getFromServer()], mostLoadedServer);
|
||||
if (!region.getTable().isSystemTable()) {
|
||||
List<ServerName> favNodes = fnm.getFavoredNodes(region);
|
||||
assertTrue(favNodes.contains(
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -295,7 +294,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
|
||||
BaseLoadBalancer.Cluster cluster = mockCluster(clusterStateMocks[0]);
|
||||
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
|
||||
costFunction.init(cluster);
|
||||
costFunction.cost();
|
||||
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST,
|
||||
|
@ -322,7 +321,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertEquals(0.0f, cost, 0.001);
|
||||
|
@ -385,14 +384,14 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
final int runs = 10;
|
||||
loadBalancer.setConf(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
loadBalancer.initCosts(cluster);
|
||||
for (int i = 0; i != runs; ++i) {
|
||||
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
|
||||
Cluster.Action action = loadBalancer.nextAction(cluster);
|
||||
BalanceAction action = loadBalancer.nextAction(cluster);
|
||||
cluster.doAction(action);
|
||||
loadBalancer.updateCostsWithAction(cluster, action);
|
||||
Cluster.Action undoAction = action.undoAction();
|
||||
BalanceAction undoAction = action.undoAction();
|
||||
cluster.doAction(undoAction);
|
||||
loadBalancer.updateCostsWithAction(cluster, undoAction);
|
||||
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
|
||||
|
@ -407,7 +406,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertTrue(cost >= 0);
|
||||
|
@ -548,7 +547,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
}
|
||||
|
||||
// This mock allows us to test the LocalityCostFunction
|
||||
private class MockCluster extends BaseLoadBalancer.Cluster {
|
||||
private class MockCluster extends BalancerClusterState {
|
||||
|
||||
private int[][] localities = null; // [region][server] = percent of blocks
|
||||
|
||||
|
|
|
@ -197,8 +197,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
final HeterogeneousRegionCountCostFunction cf =
|
||||
new HeterogeneousRegionCountCostFunction(conf);
|
||||
assertNotNull(cf);
|
||||
BaseLoadBalancer.Cluster cluster =
|
||||
new BaseLoadBalancer.Cluster(serverMap, null, null, null);
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(serverMap, null, null, null);
|
||||
cf.init(cluster);
|
||||
|
||||
// checking that we all hosts have a number of regions below their limit
|
||||
|
@ -285,10 +285,10 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
StochasticLoadBalancer.RandomCandidateGenerator {
|
||||
|
||||
@Override
|
||||
public BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster,
|
||||
public BalanceAction pickRandomRegions(BalancerClusterState cluster,
|
||||
int thisServer, int otherServer) {
|
||||
if (thisServer < 0 || otherServer < 0) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
int thisRegion = pickRandomRegion(cluster, thisServer, 0.5);
|
||||
|
@ -298,7 +298,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
}
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
BalanceAction generate(BalancerClusterState cluster) {
|
||||
return super.generate(cluster);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -55,7 +54,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
StochasticLoadBalancer.CostFunction costFunction =
|
||||
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertTrue(cost >= 0);
|
||||
|
@ -72,9 +71,9 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
int[] servers = new int[] { 3, 3, 3, 3, 3 };
|
||||
TreeMap<ServerName, List<RegionInfo>> clusterState = mockClusterServers(servers);
|
||||
|
||||
BaseLoadBalancer.Cluster cluster;
|
||||
BalancerClusterState cluster;
|
||||
|
||||
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
double costWithoutReplicas = costFunction.cost();
|
||||
assertEquals(0, costWithoutReplicas, 0);
|
||||
|
@ -84,7 +83,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
RegionReplicaUtil.getRegionInfoForReplica(clusterState.firstEntry().getValue().get(0), 1);
|
||||
clusterState.lastEntry().getValue().add(replica1);
|
||||
|
||||
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
double costWith1ReplicaDifferentServer = costFunction.cost();
|
||||
|
||||
|
@ -94,7 +93,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
RegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2);
|
||||
clusterState.lastEntry().getValue().add(replica2);
|
||||
|
||||
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
double costWith1ReplicaSameServer = costFunction.cost();
|
||||
|
||||
|
@ -117,7 +116,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
entry.getValue().add(replica2);
|
||||
it.next().getValue().add(replica3); // 2nd server
|
||||
|
||||
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
double costWith3ReplicasSameServer = costFunction.cost();
|
||||
|
||||
|
@ -131,7 +130,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
clusterState.lastEntry().getValue().add(replica2);
|
||||
clusterState.lastEntry().getValue().add(replica3);
|
||||
|
||||
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
double costWith2ReplicasOnTwoServers = costFunction.cost();
|
||||
|
||||
|
@ -152,7 +151,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
regions = randomRegions(1);
|
||||
map.put(s2, regions);
|
||||
assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
|
||||
new Cluster(map, null, null, null)));
|
||||
new BalancerClusterState(map, null, null, null)));
|
||||
// check for the case where there are two hosts on the same rack and there are two racks
|
||||
// and both the replicas are on the same rack
|
||||
map.clear();
|
||||
|
@ -165,7 +164,7 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
|
||||
assertTrue(
|
||||
loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
|
||||
new Cluster(map, null, null, new ForTestRackManagerOne())));
|
||||
new BalancerClusterState(map, null, null, new ForTestRackManagerOne())));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue