HBASE-24140 : Move CandidateGenerator and their implementors out of StochasticLoadBalancer (#1458)

Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
Viraj Jasani 2020-04-12 19:13:47 +05:30
parent 8a117c9b8e
commit 45622abe2f
No known key found for this signature in database
GPG Key ID: E906DFF511D3E5DB
6 changed files with 413 additions and 315 deletions

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Generates a candidate action to be applied to the cluster for cost function search
*/
@InterfaceAudience.Private
abstract class CandidateGenerator {
abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster);
/**
* From a list of regions pick a random one. Null can be returned which
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
* rather than swap.
*
* @param cluster The state of the cluster
* @param server index of the server
* @param chanceOfNoSwap Chance that this will decide to try a move rather
* than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is
* suggested.
*/
int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server,
double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
|| StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
}
int pickRandomServer(BaseLoadBalancer.Cluster cluster) {
if (cluster.numServers < 1) {
return -1;
}
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
}
int pickRandomRack(BaseLoadBalancer.Cluster cluster) {
if (cluster.numRacks < 1) {
return -1;
}
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
}
int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) {
if (cluster.numServers < 2) {
return -1;
}
while (true) {
int otherServerIndex = pickRandomServer(cluster);
if (otherServerIndex != serverIndex) {
return otherServerIndex;
}
}
}
int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) {
if (cluster.numRacks < 2) {
return -1;
}
while (true) {
int otherRackIndex = pickRandomRack(cluster);
if (otherRackIndex != rackIndex) {
return otherRackIndex;
}
}
}
BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster,
int thisServer, int otherServer) {
if (thisServer < 0 || otherServer < 0) {
return BaseLoadBalancer.Cluster.NullAction;
}
// Decide who is most likely to need another region
int thisRegionCount = cluster.getNumRegions(thisServer);
int otherRegionCount = cluster.getNumRegions(otherServer);
// Assign the chance based upon the above
double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
return getAction(thisServer, thisRegion, otherServer, otherRegion);
}
protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion,
int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) {
return BaseLoadBalancer.Cluster.NullAction;
}
if (fromRegion > 0 && toRegion > 0) {
return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion);
} else if (fromRegion > 0) {
return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion > 0) {
return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer);
} else {
return BaseLoadBalancer.Cluster.NullAction;
}
}
/**
* Returns a random iteration order of indexes of an array with size length
*/
List<Integer> getRandomIterationOrder(int length) {
ArrayList<Integer> order = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
order.add(i);
}
Collections.shuffle(order);
return order;
}
}

View File

@ -65,7 +65,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
* RegionServer as the new Primary RegionServer) after a region is recovered. This
* should help provide consistent read latencies for the regions even when their
* primary region servers die. This provides two
* {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
* {@link CandidateGenerator}
*
*/
@InterfaceAudience.Private

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class LoadCandidateGenerator extends CandidateGenerator {
@Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = 0;
while (servers[index] == null || servers[index] == thisServer) {
index++;
if (index == servers.length) {
return -1;
}
}
return servers[index];
}
private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = servers.length - 1;
while (servers[index] == null || servers[index] == thisServer) {
index--;
if (index < 0) {
return -1;
}
}
return servers[index];
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hbase.thirdparty.com.google.common.base.Optional;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class LocalityBasedCandidateGenerator extends CandidateGenerator {
private MasterServices masterServices;
LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
if (this.masterServices == null) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
// Randomly iterate through regions until you find one that is not on ideal host
for (int region : getRandomIterationOrder(cluster.numRegions)) {
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
currentServer, region,
cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]
);
if (potential.isPresent()) {
return potential.get();
}
}
}
return BaseLoadBalancer.Cluster.NullAction;
}
private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer.Cluster cluster,
int fromServer, int fromRegion, int toServer) {
// Try move first. We know apriori fromRegion has the highest locality on toServer
if (cluster.serverHasTooFewRegions(toServer)) {
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
}
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
- getWeightedLocality(cluster, fromRegion, fromServer);
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
- getWeightedLocality(cluster, toRegion, toServer);
// If locality would remain neutral or improve, attempt the swap
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
}
}
return Optional.absent();
}
private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
return cluster.getOrComputeWeightedLocality(region, server,
BaseLoadBalancer.Cluster.LocalityType.SERVER);
}
void setServices(MasterServices services) {
this.masterServices = services;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Generates candidates which moves the replicas out of the region server for
* co-hosted region replicas
*/
@InterfaceAudience.Private
class RegionReplicaCandidateGenerator extends CandidateGenerator {
StochasticLoadBalancer.RandomCandidateGenerator randomGenerator =
new StochasticLoadBalancer.RandomCandidateGenerator();
/**
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
* (a group is a server, host or rack)
*
* @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
* primariesOfRegionsPerHost or primariesOfRegionsPerRack
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
* @return a regionIndex for the selected primary or -1 if there is no co-locating
*/
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
int[] regionIndexToPrimaryIndex) {
int currentPrimary = -1;
int currentPrimaryIndex = -1;
int selectedPrimaryIndex = -1;
double currentLargestRandom = -1;
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
// ids for the regions hosted in server, a consecutive repetition means that replicas
// are co-hosted
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
int primary = j < primariesOfRegionsPerGroup.length
? primariesOfRegionsPerGroup[j] : -1;
if (primary != currentPrimary) { // check for whether we see a new primary
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {
selectedPrimaryIndex = currentPrimary;
currentLargestRandom = currentRandom;
}
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
// we have found the primary id for the region to move. Now find the actual regionIndex
// with the given primary, prefer to move the secondary region.
for (int regionIndex : regionsPerGroup) {
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
// always move the secondary, not the primary
if (selectedPrimaryIndex != regionIndex) {
return regionIndex;
}
}
}
return -1;
}
@Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
int serverIndex = pickRandomServer(cluster);
if (cluster.numServers <= 1 || serverIndex == -1) {
return BaseLoadBalancer.Cluster.NullAction;
}
int regionIndex = selectCoHostedRegionPerGroup(
cluster.primariesOfRegionsPerServer[serverIndex],
cluster.regionsPerServer[serverIndex],
cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}
int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}

View File

@ -21,7 +21,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
@ -54,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Optional;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -641,123 +639,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total;
}
/** Generates a candidate action to be applied to the cluster for cost function search */
abstract static class CandidateGenerator {
abstract Cluster.Action generate(Cluster cluster);
/**
* From a list of regions pick a random one. Null can be returned which
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
* rather than swap.
*
* @param cluster The state of the cluster
* @param server index of the server
* @param chanceOfNoSwap Chance that this will decide to try a move rather
* than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is
* suggested.
*/
protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
}
protected int pickRandomServer(Cluster cluster) {
if (cluster.numServers < 1) {
return -1;
}
return RANDOM.nextInt(cluster.numServers);
}
protected int pickRandomRack(Cluster cluster) {
if (cluster.numRacks < 1) {
return -1;
}
return RANDOM.nextInt(cluster.numRacks);
}
protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
if (cluster.numServers < 2) {
return -1;
}
while (true) {
int otherServerIndex = pickRandomServer(cluster);
if (otherServerIndex != serverIndex) {
return otherServerIndex;
}
}
}
protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
if (cluster.numRacks < 2) {
return -1;
}
while (true) {
int otherRackIndex = pickRandomRack(cluster);
if (otherRackIndex != rackIndex) {
return otherRackIndex;
}
}
}
protected Cluster.Action pickRandomRegions(Cluster cluster,
int thisServer,
int otherServer) {
if (thisServer < 0 || otherServer < 0) {
return Cluster.NullAction;
}
// Decide who is most likely to need another region
int thisRegionCount = cluster.getNumRegions(thisServer);
int otherRegionCount = cluster.getNumRegions(otherServer);
// Assign the chance based upon the above
double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
return getAction(thisServer, thisRegion, otherServer, otherRegion);
}
protected Cluster.Action getAction(int fromServer, int fromRegion,
int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) {
return Cluster.NullAction;
}
if (fromRegion > 0 && toRegion > 0) {
return new Cluster.SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion);
} else if (fromRegion > 0) {
return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion > 0) {
return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
} else {
return Cluster.NullAction;
}
}
/**
* Returns a random iteration order of indexes of an array with size length
*/
protected List<Integer> getRandomIterationOrder(int length) {
ArrayList<Integer> order = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
order.add(i);
}
Collections.shuffle(order);
return order;
}
}
static class RandomCandidateGenerator extends CandidateGenerator {
@Override
@ -772,201 +653,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
static class LoadCandidateGenerator extends CandidateGenerator {
@Override
Cluster.Action generate(Cluster cluster) {
cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = 0;
while (servers[index] == null || servers[index] == thisServer) {
index++;
if (index == servers.length) {
return -1;
}
}
return servers[index];
}
private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = servers.length - 1;
while (servers[index] == null || servers[index] == thisServer) {
index--;
if (index < 0) {
return -1;
}
}
return servers[index];
}
}
static class LocalityBasedCandidateGenerator extends CandidateGenerator {
private MasterServices masterServices;
LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
Cluster.Action generate(Cluster cluster) {
if (this.masterServices == null) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
// Randomly iterate through regions until you find one that is not on ideal host
for (int region : getRandomIterationOrder(cluster.numRegions)) {
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
Optional<Action> potential = tryMoveOrSwap(
cluster,
currentServer,
region,
cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
);
if (potential.isPresent()) {
return potential.get();
}
}
}
return Cluster.NullAction;
}
/**
* Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
* Returns empty optional if no move can be found
*/
private Optional<Action> tryMoveOrSwap(Cluster cluster,
int fromServer,
int fromRegion,
int toServer) {
// Try move first. We know apriori fromRegion has the highest locality on toServer
if (cluster.serverHasTooFewRegions(toServer)) {
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
}
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta =
getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
double toRegionLocalityDelta =
getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
// If locality would remain neutral or improve, attempt the swap
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
}
}
return Optional.absent();
}
private double getWeightedLocality(Cluster cluster, int region, int server) {
return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
}
void setServices(MasterServices services) {
this.masterServices = services;
}
}
/**
* Generates candidates which moves the replicas out of the region server for
* co-hosted region replicas
*/
static class RegionReplicaCandidateGenerator extends CandidateGenerator {
RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
/**
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
* (a group is a server, host or rack)
* @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
* primariesOfRegionsPerHost or primariesOfRegionsPerRack
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
* @return a regionIndex for the selected primary or -1 if there is no co-locating
*/
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
, int[] regionIndexToPrimaryIndex) {
int currentPrimary = -1;
int currentPrimaryIndex = -1;
int selectedPrimaryIndex = -1;
double currentLargestRandom = -1;
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
// ids for the regions hosted in server, a consecutive repetition means that replicas
// are co-hosted
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
int primary = j < primariesOfRegionsPerGroup.length
? primariesOfRegionsPerGroup[j] : -1;
if (primary != currentPrimary) { // check for whether we see a new primary
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = RANDOM.nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {
selectedPrimaryIndex = currentPrimary;
currentLargestRandom = currentRandom;
}
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
// we have found the primary id for the region to move. Now find the actual regionIndex
// with the given primary, prefer to move the secondary region.
for (int j = 0; j < regionsPerGroup.length; j++) {
int regionIndex = regionsPerGroup[j];
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
// always move the secondary, not the primary
if (selectedPrimaryIndex != regionIndex) {
return regionIndex;
}
}
}
return -1;
}
@Override
Cluster.Action generate(Cluster cluster) {
int serverIndex = pickRandomServer(cluster);
if (cluster.numServers <= 1 || serverIndex == -1) {
return Cluster.NullAction;
}
int regionIndex = selectCoHostedRegionPerGroup(
cluster.primariesOfRegionsPerServer[serverIndex],
cluster.regionsPerServer[serverIndex],
cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}
int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}
/**
* Generates candidates which moves the replicas out of the rack for
* co-hosted region replicas in the same rack