HBASE-26178 Improve data structure and algorithm for BalanceClusterState to improve computation speed for large cluster (#3682)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
d956828aac
commit
2b26dfbaf4
|
@ -100,6 +100,11 @@
|
|||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.agrona</groupId>
|
||||
<artifactId>agrona</artifactId>
|
||||
<version>1.12.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
|
|
|
@ -26,6 +26,8 @@ import java.util.Deque;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.agrona.collections.Hashing;
|
||||
import org.agrona.collections.Int2IntCounterMap;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -70,10 +72,12 @@ class BalancerClusterState {
|
|||
int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
|
||||
int[][] regionsPerHost; // hostIndex -> list of regions
|
||||
int[][] regionsPerRack; // rackIndex -> region list
|
||||
int[][] primariesOfRegionsPerServer; // serverIndex -> sorted list of regions by primary region
|
||||
// index
|
||||
int[][] primariesOfRegionsPerHost; // hostIndex -> sorted list of regions by primary region index
|
||||
int[][] primariesOfRegionsPerRack; // rackIndex -> sorted list of regions by primary region index
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
|
||||
// replicas by primary region index
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
|
||||
// primary region index
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
|
||||
// primary region index
|
||||
|
||||
int[][] serversPerHost; // hostIndex -> list of server indexes
|
||||
int[][] serversPerRack; // rackIndex -> list of server indexes
|
||||
|
@ -211,9 +215,9 @@ class BalancerClusterState {
|
|||
serverIndexToRegionsOffset = new int[numServers];
|
||||
regionsPerHost = new int[numHosts][];
|
||||
regionsPerRack = new int[numRacks][];
|
||||
primariesOfRegionsPerServer = new int[numServers][];
|
||||
primariesOfRegionsPerHost = new int[numHosts][];
|
||||
primariesOfRegionsPerRack = new int[numRacks][];
|
||||
colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
|
||||
colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
|
||||
colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
|
||||
|
||||
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
|
||||
|
||||
|
@ -239,7 +243,8 @@ class BalancerClusterState {
|
|||
} else {
|
||||
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
|
||||
}
|
||||
primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
|
||||
colocatedReplicaCountsPerServer[serverIndex] = new Int2IntCounterMap(
|
||||
regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
|
||||
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
|
||||
serverIndicesSortedByLocality[serverIndex] = serverIndex;
|
||||
}
|
||||
|
@ -342,67 +347,52 @@ class BalancerClusterState {
|
|||
}
|
||||
|
||||
for (int i = 0; i < regionsPerServer.length; i++) {
|
||||
primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
|
||||
colocatedReplicaCountsPerServer[i] = new Int2IntCounterMap(
|
||||
regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
|
||||
for (int j = 0; j < regionsPerServer[i].length; j++) {
|
||||
int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
|
||||
primariesOfRegionsPerServer[i][j] = primaryIndex;
|
||||
colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerServer[i]);
|
||||
}
|
||||
|
||||
// compute regionsPerHost
|
||||
if (multiServersPerHost) {
|
||||
for (int i = 0; i < serversPerHost.length; i++) {
|
||||
int numRegionsPerHost = 0;
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
|
||||
}
|
||||
regionsPerHost[i] = new int[numRegionsPerHost];
|
||||
primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
|
||||
}
|
||||
for (int i = 0; i < serversPerHost.length; i++) {
|
||||
int numRegionPerHostIndex = 0;
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
|
||||
int region = regionsPerServer[serversPerHost[i][j]][k];
|
||||
regionsPerHost[i][numRegionPerHostIndex] = region;
|
||||
int primaryIndex = regionIndexToPrimaryIndex[region];
|
||||
primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
|
||||
numRegionPerHostIndex++;
|
||||
}
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerHost[i]);
|
||||
}
|
||||
populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
|
||||
serversPerHost);
|
||||
}
|
||||
|
||||
// compute regionsPerRack
|
||||
if (numRacks > 1) {
|
||||
for (int i = 0; i < serversPerRack.length; i++) {
|
||||
int numRegionsPerRack = 0;
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
|
||||
}
|
||||
regionsPerRack[i] = new int[numRegionsPerRack];
|
||||
primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
|
||||
}
|
||||
populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
|
||||
serversPerRack);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerRack.length; i++) {
|
||||
int numRegionPerRackIndex = 0;
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
|
||||
int region = regionsPerServer[serversPerRack[i][j]][k];
|
||||
regionsPerRack[i][numRegionPerRackIndex] = region;
|
||||
int primaryIndex = regionIndexToPrimaryIndex[region];
|
||||
primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
|
||||
numRegionPerRackIndex++;
|
||||
}
|
||||
private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerLocation,
|
||||
int[][] serversPerLocation) {
|
||||
for (int i = 0; i < serversPerLocation.length; i++) {
|
||||
int numRegionsPerLocation = 0;
|
||||
for (int j = 0; j < serversPerLocation[i].length; j++) {
|
||||
numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
|
||||
}
|
||||
regionsPerLocation[i] = new int[numRegionsPerLocation];
|
||||
colocatedReplicaCountsPerLocation[i] = new Int2IntCounterMap(numRegionsPerLocation,
|
||||
Hashing.DEFAULT_LOAD_FACTOR, 0);
|
||||
}
|
||||
|
||||
for (int i = 0; i < serversPerLocation.length; i++) {
|
||||
int numRegionPerLocationIndex = 0;
|
||||
for (int j = 0; j < serversPerLocation[i].length; j++) {
|
||||
for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
|
||||
int region = regionsPerServer[serversPerLocation[i][j]][k];
|
||||
regionsPerLocation[i][numRegionPerLocationIndex] = region;
|
||||
int primaryIndex = regionIndexToPrimaryIndex[region];
|
||||
colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
|
||||
numRegionPerLocationIndex++;
|
||||
}
|
||||
// sort the regions by primaries.
|
||||
Arrays.sort(primariesOfRegionsPerRack[i]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Helper for Cluster constructor to handle a region */
|
||||
|
@ -608,7 +598,7 @@ class BalancerClusterState {
|
|||
boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
|
||||
if (!serversToIndex.containsKey(serverName.getAddress())) {
|
||||
return false; // safeguard against race between cluster.servers and servers from LB method
|
||||
// args
|
||||
// args
|
||||
}
|
||||
int server = serversToIndex.get(serverName.getAddress());
|
||||
int region = regionsToIndex.get(regionInfo);
|
||||
|
@ -627,48 +617,51 @@ class BalancerClusterState {
|
|||
}
|
||||
// there is a subset relation for server < host < rack
|
||||
// check server first
|
||||
if (contains(primariesOfRegionsPerServer[server], primary)) {
|
||||
// check for whether there are other servers that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
|
||||
if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
|
||||
return true; // meaning there is a better server
|
||||
}
|
||||
}
|
||||
return false; // there is not a better server to place this
|
||||
int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
|
||||
if (result != 0) {
|
||||
return result > 0;
|
||||
}
|
||||
|
||||
// check host
|
||||
if (multiServersPerHost) {
|
||||
// these arrays would only be allocated if we have more than one server per host
|
||||
int host = serverIndexToHostIndex[server];
|
||||
if (contains(primariesOfRegionsPerHost[host], primary)) {
|
||||
// check for whether there are other hosts that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
|
||||
if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
|
||||
return true; // meaning there is a better host
|
||||
}
|
||||
}
|
||||
return false; // there is not a better host to place this
|
||||
result = checkLocationForPrimary(serverIndexToHostIndex[server],
|
||||
colocatedReplicaCountsPerHost, primary);
|
||||
if (result != 0) {
|
||||
return result > 0;
|
||||
}
|
||||
}
|
||||
|
||||
// check rack
|
||||
if (numRacks > 1) {
|
||||
int rack = serverIndexToRackIndex[server];
|
||||
if (contains(primariesOfRegionsPerRack[rack], primary)) {
|
||||
// check for whether there are other racks that we can place this region
|
||||
for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
|
||||
if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
|
||||
return true; // meaning there is a better rack
|
||||
}
|
||||
}
|
||||
return false; // there is not a better rack to place this
|
||||
result = checkLocationForPrimary(serverIndexToRackIndex[server],
|
||||
colocatedReplicaCountsPerRack, primary);
|
||||
if (result != 0) {
|
||||
return result > 0;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common method for better solution check.
|
||||
* @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
|
||||
* colocatedReplicaCountsPerRack
|
||||
* @return 1 for better, -1 for no better, 0 for unknown
|
||||
*/
|
||||
private int checkLocationForPrimary(int location,
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
|
||||
if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
|
||||
// check for whether there are other Locations that we can place this region
|
||||
for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
|
||||
if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
|
||||
return 1; // meaning there is a better Location
|
||||
}
|
||||
}
|
||||
return -1; // there is not a better Location to place this
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
|
||||
if (!serversToIndex.containsKey(serverName.getAddress())) {
|
||||
return;
|
||||
|
@ -699,45 +692,45 @@ class BalancerClusterState {
|
|||
// update for servers
|
||||
int primary = regionIndexToPrimaryIndex[region];
|
||||
if (oldServer >= 0) {
|
||||
primariesOfRegionsPerServer[oldServer] =
|
||||
removeRegion(primariesOfRegionsPerServer[oldServer], primary);
|
||||
colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
|
||||
}
|
||||
primariesOfRegionsPerServer[newServer] =
|
||||
addRegionSorted(primariesOfRegionsPerServer[newServer], primary);
|
||||
colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
|
||||
|
||||
// update for hosts
|
||||
if (multiServersPerHost) {
|
||||
int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
|
||||
int newHost = serverIndexToHostIndex[newServer];
|
||||
if (newHost != oldHost) {
|
||||
regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
|
||||
primariesOfRegionsPerHost[newHost] =
|
||||
addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
|
||||
if (oldHost >= 0) {
|
||||
regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
|
||||
primariesOfRegionsPerHost[oldHost] =
|
||||
removeRegion(primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
|
||||
}
|
||||
}
|
||||
updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
|
||||
oldServer, newServer, primary, region);
|
||||
}
|
||||
|
||||
// update for racks
|
||||
if (numRacks > 1) {
|
||||
int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
|
||||
int newRack = serverIndexToRackIndex[newServer];
|
||||
if (newRack != oldRack) {
|
||||
regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
|
||||
primariesOfRegionsPerRack[newRack] =
|
||||
addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
|
||||
if (oldRack >= 0) {
|
||||
regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
|
||||
primariesOfRegionsPerRack[oldRack] =
|
||||
removeRegion(primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
|
||||
}
|
||||
}
|
||||
updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
|
||||
oldServer, newServer, primary, region);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Common method for per host and per Location region index updates when a region is moved.
|
||||
* @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex
|
||||
* @param regionsPerLocation regionsPerHost or regionsPerLocation
|
||||
* @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
|
||||
* colocatedReplicaCountsPerRack
|
||||
*/
|
||||
private void updateForLocation(int[] serverIndexToLocation,
|
||||
int[][] regionsPerLocation,
|
||||
Int2IntCounterMap[] colocatedReplicaCountsPerLocation,
|
||||
int oldServer, int newServer, int primary, int region) {
|
||||
int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
|
||||
int newLocation = serverIndexToLocation[newServer];
|
||||
if (newLocation != oldLocation) {
|
||||
regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
|
||||
colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
|
||||
if (oldLocation >= 0) {
|
||||
regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
|
||||
colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
int[] removeRegion(int[] regions, int regionIndex) {
|
||||
// TODO: this maybe costly. Consider using linked lists
|
||||
int[] newRegions = new int[regions.length - 1];
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.agrona.collections.Int2IntCounterMap;
|
||||
import org.agrona.collections.IntArrayList;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -33,47 +36,32 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
|||
/**
|
||||
* Randomly select one regionIndex out of all region replicas co-hosted in the same group (a group
|
||||
* is a server, host or rack)
|
||||
* @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
|
||||
* primariesOfRegionsPerHost or primariesOfRegionsPerRack
|
||||
* @param colocatedReplicaCountsPerGroup either Cluster.colocatedReplicaCountsPerServer,
|
||||
* colocatedReplicaCountsPerHost or colocatedReplicaCountsPerRack
|
||||
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
|
||||
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
|
||||
* @return a regionIndex for the selected primary or -1 if there is no co-locating
|
||||
*/
|
||||
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
|
||||
int[] regionIndexToPrimaryIndex) {
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
int selectedPrimaryIndex = -1;
|
||||
double currentLargestRandom = -1;
|
||||
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
|
||||
// ids for the regions hosted in server, a consecutive repetition means that replicas
|
||||
// are co-hosted
|
||||
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
|
||||
int primary = j < primariesOfRegionsPerGroup.length ? primariesOfRegionsPerGroup[j] : -1;
|
||||
if (primary != currentPrimary) { // check for whether we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
// decide to select this primary region id or not
|
||||
double currentRandom = ThreadLocalRandom.current().nextDouble();
|
||||
// we don't know how many region replicas are co-hosted, we will randomly select one
|
||||
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
|
||||
if (currentRandom > currentLargestRandom) {
|
||||
selectedPrimaryIndex = currentPrimary;
|
||||
currentLargestRandom = currentRandom;
|
||||
}
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
int selectCoHostedRegionPerGroup(Int2IntCounterMap colocatedReplicaCountsPerGroup,
|
||||
int[] regionsPerGroup, int[] regionIndexToPrimaryIndex) {
|
||||
final IntArrayList colocated = new IntArrayList(colocatedReplicaCountsPerGroup.size(), -1);
|
||||
colocatedReplicaCountsPerGroup.forEach((primary, count) -> {
|
||||
if (count > 1) { // means consecutive primaries, indicating co-location
|
||||
colocated.add(primary);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// we have found the primary id for the region to move. Now find the actual regionIndex
|
||||
// with the given primary, prefer to move the secondary region.
|
||||
for (int regionIndex : regionsPerGroup) {
|
||||
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
|
||||
// always move the secondary, not the primary
|
||||
if (selectedPrimaryIndex != regionIndex) {
|
||||
return regionIndex;
|
||||
if (!colocated.isEmpty()) {
|
||||
int rand = ThreadLocalRandom.current().nextInt(colocated.size());
|
||||
int selectedPrimaryIndex = colocated.get(rand);
|
||||
// we have found the primary id for the region to move. Now find the actual regionIndex
|
||||
// with the given primary, prefer to move the secondary region.
|
||||
for (int regionIndex : regionsPerGroup) {
|
||||
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
|
||||
// always move the secondary, not the primary
|
||||
if (selectedPrimaryIndex != regionIndex) {
|
||||
return regionIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -87,7 +75,8 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
|||
return BalanceAction.NULL_ACTION;
|
||||
}
|
||||
|
||||
int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerServer[serverIndex],
|
||||
int regionIndex = selectCoHostedRegionPerGroup(
|
||||
cluster.colocatedReplicaCountsPerServer[serverIndex],
|
||||
cluster.regionsPerServer[serverIndex], cluster.regionIndexToPrimaryIndex);
|
||||
|
||||
// if there are no pairs of region replicas co-hosted, default to random generator
|
||||
|
@ -100,5 +89,4 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
|||
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
|
||||
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.agrona.collections.Hashing;
|
||||
import org.agrona.collections.Int2IntCounterMap;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -27,7 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class RegionReplicaGroupingCostFunction extends CostFunction {
|
||||
|
||||
protected long maxCost = 0;
|
||||
protected long[] costsPerGroup; // group is either server, host or rack
|
||||
|
||||
|
@ -44,14 +47,13 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
|
|||
|
||||
protected final long getMaxCost(BalancerClusterState cluster) {
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
int[] primariesOfRegions = new int[cluster.numRegions];
|
||||
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
|
||||
cluster.regions.length);
|
||||
|
||||
Arrays.sort(primariesOfRegions);
|
||||
|
||||
Int2IntCounterMap colocatedReplicaCounts = new Int2IntCounterMap(cluster.numRegions,
|
||||
Hashing.DEFAULT_LOAD_FACTOR, 0);
|
||||
for (int i = 0; i < cluster.regionIndexToPrimaryIndex.length; i++) {
|
||||
colocatedReplicaCounts.getAndIncrement(cluster.regionIndexToPrimaryIndex[i]);
|
||||
}
|
||||
// compute numReplicas from the sorted array
|
||||
return costPerGroup(primariesOfRegions);
|
||||
return costPerGroup(colocatedReplicaCounts);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,28 +79,18 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
|
|||
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
|
||||
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
|
||||
* + (3-1) * (3-1) + (1-1) * (1-1).
|
||||
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
|
||||
* @param colocatedReplicaCounts a sorted array of primary regions ids for the regions hosted
|
||||
* @return a sum of numReplicas-1 squared for each primary region in the group.
|
||||
*/
|
||||
protected final long costPerGroup(int[] primariesOfRegions) {
|
||||
long cost = 0;
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
|
||||
protected final long costPerGroup(Int2IntCounterMap colocatedReplicaCounts) {
|
||||
final AtomicLong cost = new AtomicLong(0);
|
||||
// colocatedReplicaCounts is a sorted array of primary ids of regions. Replicas of regions
|
||||
// sharing the same primary will have consecutive numbers in the array.
|
||||
for (int j = 0; j <= primariesOfRegions.length; j++) {
|
||||
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
|
||||
if (primary != currentPrimary) { // we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
// square the cost
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
cost += (numReplicas - 1) * (numReplicas - 1);
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
colocatedReplicaCounts.forEach((primary,count) -> {
|
||||
if (count > 1) { // means consecutive primaries, indicating co-location
|
||||
cost.getAndAdd((count - 1) * (count - 1));
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
});
|
||||
return cost.longValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.agrona.collections.Int2IntCounterMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -32,7 +33,7 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
|
|||
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
|
||||
|
||||
private int[][] primariesOfRegionsPerGroup;
|
||||
private Int2IntCounterMap[] colocatedReplicaCountsPerGroup;
|
||||
|
||||
public RegionReplicaHostCostFunction(Configuration conf) {
|
||||
this.setMultiplier(
|
||||
|
@ -44,10 +45,11 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
|
|||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
|
||||
costsPerGroup = new long[cluster.numHosts];
|
||||
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
|
||||
? cluster.primariesOfRegionsPerHost : cluster.primariesOfRegionsPerServer;
|
||||
for (int i = 0; i < primariesOfRegionsPerGroup.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
|
||||
// either server based or host based
|
||||
colocatedReplicaCountsPerGroup = cluster.multiServersPerHost
|
||||
? cluster.colocatedReplicaCountsPerHost : cluster.colocatedReplicaCountsPerServer;
|
||||
for (int i = 0; i < colocatedReplicaCountsPerGroup.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(colocatedReplicaCountsPerGroup[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,12 +62,12 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
|
|||
int oldHost = cluster.serverIndexToHostIndex[oldServer];
|
||||
int newHost = cluster.serverIndexToHostIndex[newServer];
|
||||
if (newHost != oldHost) {
|
||||
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
|
||||
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
|
||||
costsPerGroup[oldHost] = costPerGroup(cluster.colocatedReplicaCountsPerHost[oldHost]);
|
||||
costsPerGroup[newHost] = costPerGroup(cluster.colocatedReplicaCountsPerHost[newHost]);
|
||||
}
|
||||
} else {
|
||||
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
|
||||
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
|
||||
costsPerGroup[oldServer] = costPerGroup(cluster.colocatedReplicaCountsPerServer[oldServer]);
|
||||
costsPerGroup[newServer] = costPerGroup(cluster.colocatedReplicaCountsPerServer[newServer]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerato
|
|||
return super.generate(cluster);
|
||||
}
|
||||
|
||||
int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack[rackIndex],
|
||||
int regionIndex = selectCoHostedRegionPerGroup(cluster.colocatedReplicaCountsPerRack[rackIndex],
|
||||
cluster.regionsPerRack[rackIndex], cluster.regionIndexToPrimaryIndex);
|
||||
|
||||
// if there are no pairs of region replicas co-hosted, default to random generator
|
||||
|
@ -50,4 +50,4 @@ class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerato
|
|||
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
|
||||
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,8 +45,8 @@ class RegionReplicaRackCostFunction extends RegionReplicaGroupingCostFunction {
|
|||
// max cost is the case where every region replica is hosted together regardless of rack
|
||||
maxCost = getMaxCost(cluster);
|
||||
costsPerGroup = new long[cluster.numRacks];
|
||||
for (int i = 0; i < cluster.primariesOfRegionsPerRack.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
|
||||
for (int i = 0; i < cluster.colocatedReplicaCountsPerRack.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(cluster.colocatedReplicaCountsPerRack[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,8 +58,8 @@ class RegionReplicaRackCostFunction extends RegionReplicaGroupingCostFunction {
|
|||
int oldRack = cluster.serverIndexToRackIndex[oldServer];
|
||||
int newRack = cluster.serverIndexToRackIndex[newServer];
|
||||
if (newRack != oldRack) {
|
||||
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
|
||||
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
|
||||
costsPerGroup[oldRack] = costPerGroup(cluster.colocatedReplicaCountsPerRack[oldRack]);
|
||||
costsPerGroup[newRack] = costPerGroup(cluster.colocatedReplicaCountsPerRack[newRack]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,12 +55,12 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
|
|||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
int numNodes = 30;
|
||||
int numRegions = numNodes * 30;
|
||||
int numNodes = 4;
|
||||
int numRegions = numNodes * 1;
|
||||
int replication = 3; // 3 replicas per region
|
||||
int numRegionsPerServer = 28;
|
||||
int numTables = 10;
|
||||
int numRacks = 4; // all replicas should be on a different rack
|
||||
int numRegionsPerServer = 1;
|
||||
int numTables = 1;
|
||||
int numRacks = 3; // all replicas should be on a different rack
|
||||
Map<ServerName, List<RegionInfo>> serverMap =
|
||||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
RackManager rm = new ForTestRackManager(numRacks);
|
||||
|
|
|
@ -451,6 +451,10 @@
|
|||
<pattern>net/</pattern>
|
||||
<shadedPattern>${shaded.prefix}.net.</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>org.agrona</pattern>
|
||||
<shadedPattern>${shaded.prefix}.org.agrona</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
<transformers>
|
||||
<!-- Need to filter out some extraneous license files.
|
||||
|
|
Loading…
Reference in New Issue