HBASE-24140 : Move CandidateGenerator and their implementors out of StochasticLoadBalancer (#1458)
Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
parent
a9210ef5fe
commit
b6b8c6973d
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Generates a candidate action to be applied to the cluster for cost function search
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class CandidateGenerator {
|
||||
|
||||
abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster);
|
||||
|
||||
/**
|
||||
* From a list of regions pick a random one. Null can be returned which
|
||||
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
|
||||
* rather than swap.
|
||||
*
|
||||
* @param cluster The state of the cluster
|
||||
* @param server index of the server
|
||||
* @param chanceOfNoSwap Chance that this will decide to try a move rather
|
||||
* than a swap.
|
||||
* @return a random {@link RegionInfo} or null if an asymmetrical move is
|
||||
* suggested.
|
||||
*/
|
||||
int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server,
|
||||
double chanceOfNoSwap) {
|
||||
// Check to see if this is just a move.
|
||||
if (cluster.regionsPerServer[server].length == 0
|
||||
|| StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
|
||||
// signal a move only.
|
||||
return -1;
|
||||
}
|
||||
int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
|
||||
return cluster.regionsPerServer[server][rand];
|
||||
}
|
||||
|
||||
int pickRandomServer(BaseLoadBalancer.Cluster cluster) {
|
||||
if (cluster.numServers < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
|
||||
}
|
||||
|
||||
int pickRandomRack(BaseLoadBalancer.Cluster cluster) {
|
||||
if (cluster.numRacks < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
|
||||
}
|
||||
|
||||
int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) {
|
||||
if (cluster.numServers < 2) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
int otherServerIndex = pickRandomServer(cluster);
|
||||
if (otherServerIndex != serverIndex) {
|
||||
return otherServerIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) {
|
||||
if (cluster.numRacks < 2) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
int otherRackIndex = pickRandomRack(cluster);
|
||||
if (otherRackIndex != rackIndex) {
|
||||
return otherRackIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster,
|
||||
int thisServer, int otherServer) {
|
||||
if (thisServer < 0 || otherServer < 0) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
|
||||
// Decide who is most likely to need another region
|
||||
int thisRegionCount = cluster.getNumRegions(thisServer);
|
||||
int otherRegionCount = cluster.getNumRegions(otherServer);
|
||||
|
||||
// Assign the chance based upon the above
|
||||
double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
|
||||
double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
|
||||
|
||||
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
|
||||
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
|
||||
|
||||
return getAction(thisServer, thisRegion, otherServer, otherRegion);
|
||||
}
|
||||
|
||||
protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion,
|
||||
int toServer, int toRegion) {
|
||||
if (fromServer < 0 || toServer < 0) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
if (fromRegion > 0 && toRegion > 0) {
|
||||
return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion,
|
||||
toServer, toRegion);
|
||||
} else if (fromRegion > 0) {
|
||||
return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
|
||||
} else if (toRegion > 0) {
|
||||
return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer);
|
||||
} else {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random iteration order of indexes of an array with size length
|
||||
*/
|
||||
List<Integer> getRandomIterationOrder(int length) {
|
||||
ArrayList<Integer> order = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
order.add(i);
|
||||
}
|
||||
Collections.shuffle(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
}
|
|
@ -65,7 +65,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
|||
* RegionServer as the new Primary RegionServer) after a region is recovered. This
|
||||
* should help provide consistent read latencies for the regions even when their
|
||||
* primary region servers die. This provides two
|
||||
* {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
|
||||
* {@link CandidateGenerator}
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class LoadCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
cluster.sortServersByRegionCount();
|
||||
int thisServer = pickMostLoadedServer(cluster, -1);
|
||||
int otherServer = pickLeastLoadedServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = 0;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index++;
|
||||
if (index == servers.length) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
}
|
||||
|
||||
private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = servers.length - 1;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index--;
|
||||
if (index < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Optional;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
private MasterServices masterServices;
|
||||
|
||||
LocalityBasedCandidateGenerator(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
if (this.masterServices == null) {
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
// Pick the other server
|
||||
int otherServer = pickOtherRandomServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
// Randomly iterate through regions until you find one that is not on ideal host
|
||||
for (int region : getRandomIterationOrder(cluster.numRegions)) {
|
||||
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
|
||||
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
|
||||
currentServer, region,
|
||||
cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]
|
||||
);
|
||||
if (potential.isPresent()) {
|
||||
return potential.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
|
||||
private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer.Cluster cluster,
|
||||
int fromServer, int fromRegion, int toServer) {
|
||||
// Try move first. We know apriori fromRegion has the highest locality on toServer
|
||||
if (cluster.serverHasTooFewRegions(toServer)) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
|
||||
}
|
||||
// Compare locality gain/loss from swapping fromRegion with regions on toServer
|
||||
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
|
||||
- getWeightedLocality(cluster, fromRegion, fromServer);
|
||||
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
|
||||
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
|
||||
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
|
||||
- getWeightedLocality(cluster, toRegion, toServer);
|
||||
// If locality would remain neutral or improve, attempt the swap
|
||||
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
|
||||
}
|
||||
}
|
||||
return Optional.absent();
|
||||
}
|
||||
|
||||
private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
|
||||
return cluster.getOrComputeWeightedLocality(region, server,
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER);
|
||||
}
|
||||
|
||||
void setServices(MasterServices services) {
|
||||
this.masterServices = services;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Generates candidates which moves the replicas out of the region server for
|
||||
* co-hosted region replicas
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
StochasticLoadBalancer.RandomCandidateGenerator randomGenerator =
|
||||
new StochasticLoadBalancer.RandomCandidateGenerator();
|
||||
|
||||
/**
|
||||
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
|
||||
* (a group is a server, host or rack)
|
||||
*
|
||||
* @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
|
||||
* primariesOfRegionsPerHost or primariesOfRegionsPerRack
|
||||
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
|
||||
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
|
||||
* @return a regionIndex for the selected primary or -1 if there is no co-locating
|
||||
*/
|
||||
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
|
||||
int[] regionIndexToPrimaryIndex) {
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
int selectedPrimaryIndex = -1;
|
||||
double currentLargestRandom = -1;
|
||||
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
|
||||
// ids for the regions hosted in server, a consecutive repetition means that replicas
|
||||
// are co-hosted
|
||||
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
|
||||
int primary = j < primariesOfRegionsPerGroup.length
|
||||
? primariesOfRegionsPerGroup[j] : -1;
|
||||
if (primary != currentPrimary) { // check for whether we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
// decide to select this primary region id or not
|
||||
double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble();
|
||||
// we don't know how many region replicas are co-hosted, we will randomly select one
|
||||
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
|
||||
if (currentRandom > currentLargestRandom) {
|
||||
selectedPrimaryIndex = currentPrimary;
|
||||
currentLargestRandom = currentRandom;
|
||||
}
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
// we have found the primary id for the region to move. Now find the actual regionIndex
|
||||
// with the given primary, prefer to move the secondary region.
|
||||
for (int regionIndex : regionsPerGroup) {
|
||||
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
|
||||
// always move the secondary, not the primary
|
||||
if (selectedPrimaryIndex != regionIndex) {
|
||||
return regionIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
int serverIndex = pickRandomServer(cluster);
|
||||
if (cluster.numServers <= 1 || serverIndex == -1) {
|
||||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
|
||||
int regionIndex = selectCoHostedRegionPerGroup(
|
||||
cluster.primariesOfRegionsPerServer[serverIndex],
|
||||
cluster.regionsPerServer[serverIndex],
|
||||
cluster.regionIndexToPrimaryIndex);
|
||||
|
||||
// if there are no pairs of region replicas co-hosted, default to random generator
|
||||
if (regionIndex == -1) {
|
||||
// default to randompicker
|
||||
return randomGenerator.generate(cluster);
|
||||
}
|
||||
|
||||
int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
|
||||
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
|
||||
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
|
||||
}
|
||||
|
||||
}
|
|
@ -21,7 +21,6 @@ import java.util.ArrayDeque;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
|
@ -54,7 +53,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Optional;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
|
||||
|
@ -642,123 +640,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return total;
|
||||
}
|
||||
|
||||
/** Generates a candidate action to be applied to the cluster for cost function search */
|
||||
abstract static class CandidateGenerator {
|
||||
abstract Cluster.Action generate(Cluster cluster);
|
||||
|
||||
/**
|
||||
* From a list of regions pick a random one. Null can be returned which
|
||||
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
|
||||
* rather than swap.
|
||||
*
|
||||
* @param cluster The state of the cluster
|
||||
* @param server index of the server
|
||||
* @param chanceOfNoSwap Chance that this will decide to try a move rather
|
||||
* than a swap.
|
||||
* @return a random {@link RegionInfo} or null if an asymmetrical move is
|
||||
* suggested.
|
||||
*/
|
||||
protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
|
||||
// Check to see if this is just a move.
|
||||
if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
|
||||
// signal a move only.
|
||||
return -1;
|
||||
}
|
||||
int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
|
||||
return cluster.regionsPerServer[server][rand];
|
||||
|
||||
}
|
||||
protected int pickRandomServer(Cluster cluster) {
|
||||
if (cluster.numServers < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return RANDOM.nextInt(cluster.numServers);
|
||||
}
|
||||
|
||||
protected int pickRandomRack(Cluster cluster) {
|
||||
if (cluster.numRacks < 1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return RANDOM.nextInt(cluster.numRacks);
|
||||
}
|
||||
|
||||
protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
|
||||
if (cluster.numServers < 2) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
int otherServerIndex = pickRandomServer(cluster);
|
||||
if (otherServerIndex != serverIndex) {
|
||||
return otherServerIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
|
||||
if (cluster.numRacks < 2) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
int otherRackIndex = pickRandomRack(cluster);
|
||||
if (otherRackIndex != rackIndex) {
|
||||
return otherRackIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Cluster.Action pickRandomRegions(Cluster cluster,
|
||||
int thisServer,
|
||||
int otherServer) {
|
||||
if (thisServer < 0 || otherServer < 0) {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
|
||||
// Decide who is most likely to need another region
|
||||
int thisRegionCount = cluster.getNumRegions(thisServer);
|
||||
int otherRegionCount = cluster.getNumRegions(otherServer);
|
||||
|
||||
// Assign the chance based upon the above
|
||||
double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
|
||||
double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
|
||||
|
||||
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
|
||||
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
|
||||
|
||||
return getAction(thisServer, thisRegion, otherServer, otherRegion);
|
||||
}
|
||||
|
||||
protected Cluster.Action getAction(int fromServer, int fromRegion,
|
||||
int toServer, int toRegion) {
|
||||
if (fromServer < 0 || toServer < 0) {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
if (fromRegion > 0 && toRegion > 0) {
|
||||
return new Cluster.SwapRegionsAction(fromServer, fromRegion,
|
||||
toServer, toRegion);
|
||||
} else if (fromRegion > 0) {
|
||||
return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
|
||||
} else if (toRegion > 0) {
|
||||
return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
|
||||
} else {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random iteration order of indexes of an array with size length
|
||||
*/
|
||||
protected List<Integer> getRandomIterationOrder(int length) {
|
||||
ArrayList<Integer> order = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
order.add(i);
|
||||
}
|
||||
Collections.shuffle(order);
|
||||
return order;
|
||||
}
|
||||
}
|
||||
|
||||
static class RandomCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
|
@ -773,201 +654,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
static class LoadCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
cluster.sortServersByRegionCount();
|
||||
int thisServer = pickMostLoadedServer(cluster, -1);
|
||||
int otherServer = pickLeastLoadedServer(cluster, thisServer);
|
||||
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = 0;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index++;
|
||||
if (index == servers.length) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
}
|
||||
|
||||
private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = servers.length - 1;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index--;
|
||||
if (index < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
}
|
||||
}
|
||||
|
||||
static class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
private MasterServices masterServices;
|
||||
|
||||
LocalityBasedCandidateGenerator(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
if (this.masterServices == null) {
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
// Pick the other server
|
||||
int otherServer = pickOtherRandomServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
// Randomly iterate through regions until you find one that is not on ideal host
|
||||
for (int region : getRandomIterationOrder(cluster.numRegions)) {
|
||||
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
|
||||
Optional<Action> potential = tryMoveOrSwap(
|
||||
cluster,
|
||||
currentServer,
|
||||
region,
|
||||
cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
|
||||
);
|
||||
if (potential.isPresent()) {
|
||||
return potential.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
|
||||
* Returns empty optional if no move can be found
|
||||
*/
|
||||
private Optional<Action> tryMoveOrSwap(Cluster cluster,
|
||||
int fromServer,
|
||||
int fromRegion,
|
||||
int toServer) {
|
||||
// Try move first. We know apriori fromRegion has the highest locality on toServer
|
||||
if (cluster.serverHasTooFewRegions(toServer)) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
|
||||
}
|
||||
|
||||
// Compare locality gain/loss from swapping fromRegion with regions on toServer
|
||||
double fromRegionLocalityDelta =
|
||||
getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
|
||||
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
|
||||
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
|
||||
double toRegionLocalityDelta =
|
||||
getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
|
||||
// If locality would remain neutral or improve, attempt the swap
|
||||
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.absent();
|
||||
}
|
||||
|
||||
private double getWeightedLocality(Cluster cluster, int region, int server) {
|
||||
return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
|
||||
}
|
||||
|
||||
void setServices(MasterServices services) {
|
||||
this.masterServices = services;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates candidates which moves the replicas out of the region server for
|
||||
* co-hosted region replicas
|
||||
*/
|
||||
static class RegionReplicaCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
|
||||
|
||||
/**
|
||||
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
|
||||
* (a group is a server, host or rack)
|
||||
* @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
|
||||
* primariesOfRegionsPerHost or primariesOfRegionsPerRack
|
||||
* @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
|
||||
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
|
||||
* @return a regionIndex for the selected primary or -1 if there is no co-locating
|
||||
*/
|
||||
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
|
||||
, int[] regionIndexToPrimaryIndex) {
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
int selectedPrimaryIndex = -1;
|
||||
double currentLargestRandom = -1;
|
||||
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
|
||||
// ids for the regions hosted in server, a consecutive repetition means that replicas
|
||||
// are co-hosted
|
||||
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
|
||||
int primary = j < primariesOfRegionsPerGroup.length
|
||||
? primariesOfRegionsPerGroup[j] : -1;
|
||||
if (primary != currentPrimary) { // check for whether we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
// decide to select this primary region id or not
|
||||
double currentRandom = RANDOM.nextDouble();
|
||||
// we don't know how many region replicas are co-hosted, we will randomly select one
|
||||
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
|
||||
if (currentRandom > currentLargestRandom) {
|
||||
selectedPrimaryIndex = currentPrimary;
|
||||
currentLargestRandom = currentRandom;
|
||||
}
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
// we have found the primary id for the region to move. Now find the actual regionIndex
|
||||
// with the given primary, prefer to move the secondary region.
|
||||
for (int j = 0; j < regionsPerGroup.length; j++) {
|
||||
int regionIndex = regionsPerGroup[j];
|
||||
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
|
||||
// always move the secondary, not the primary
|
||||
if (selectedPrimaryIndex != regionIndex) {
|
||||
return regionIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
int serverIndex = pickRandomServer(cluster);
|
||||
if (cluster.numServers <= 1 || serverIndex == -1) {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
|
||||
int regionIndex = selectCoHostedRegionPerGroup(
|
||||
cluster.primariesOfRegionsPerServer[serverIndex],
|
||||
cluster.regionsPerServer[serverIndex],
|
||||
cluster.regionIndexToPrimaryIndex);
|
||||
|
||||
// if there are no pairs of region replicas co-hosted, default to random generator
|
||||
if (regionIndex == -1) {
|
||||
// default to randompicker
|
||||
return randomGenerator.generate(cluster);
|
||||
}
|
||||
|
||||
int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
|
||||
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
|
||||
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates candidates which moves the replicas out of the rack for
|
||||
* co-hosted region replicas in the same rack
|
||||
|
|
Loading…
Reference in New Issue