From 45622abe2fc63c75ade5679a5b34971aef06eaf7 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sun, 12 Apr 2020 19:13:47 +0530 Subject: [PATCH] HBASE-24140 : Move CandidateGenerator and their implementors out of StochasticLoadBalancer (#1458) Signed-off-by: Jan Hentschel --- .../master/balancer/CandidateGenerator.java | 151 +++++++++ .../balancer/FavoredStochasticBalancer.java | 2 +- .../balancer/LoadCandidateGenerator.java | 60 ++++ .../LocalityBasedCandidateGenerator.java | 93 ++++++ .../RegionReplicaCandidateGenerator.java | 108 ++++++ .../balancer/StochasticLoadBalancer.java | 314 ------------------ 6 files changed, 413 insertions(+), 315 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java new file mode 100644 index 00000000000..77bbcf9ef70 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.client.RegionInfo; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Generates a candidate action to be applied to the cluster for cost function search + */ +@InterfaceAudience.Private +abstract class CandidateGenerator { + + abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster); + + /** + * From a list of regions pick a random one. Null can be returned which + * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move + * rather than swap. + * + * @param cluster The state of the cluster + * @param server index of the server + * @param chanceOfNoSwap Chance that this will decide to try a move rather + * than a swap. + * @return a random {@link RegionInfo} or null if an asymmetrical move is + * suggested. + */ + int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server, + double chanceOfNoSwap) { + // Check to see if this is just a move. + if (cluster.regionsPerServer[server].length == 0 + || StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) { + // signal a move only. + return -1; + } + int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length); + return cluster.regionsPerServer[server][rand]; + } + + int pickRandomServer(BaseLoadBalancer.Cluster cluster) { + if (cluster.numServers < 1) { + return -1; + } + + return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers); + } + + int pickRandomRack(BaseLoadBalancer.Cluster cluster) { + if (cluster.numRacks < 1) { + return -1; + } + + return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks); + } + + int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) { + if (cluster.numServers < 2) { + return -1; + } + while (true) { + int otherServerIndex = pickRandomServer(cluster); + if (otherServerIndex != serverIndex) { + return otherServerIndex; + } + } + } + + int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) { + if (cluster.numRacks < 2) { + return -1; + } + while (true) { + int otherRackIndex = pickRandomRack(cluster); + if (otherRackIndex != rackIndex) { + return otherRackIndex; + } + } + } + + BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster, + int thisServer, int otherServer) { + if (thisServer < 0 || otherServer < 0) { + return BaseLoadBalancer.Cluster.NullAction; + } + + // Decide who is most likely to need another region + int thisRegionCount = cluster.getNumRegions(thisServer); + int otherRegionCount = cluster.getNumRegions(otherServer); + + // Assign the chance based upon the above + double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5; + double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5; + + int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); + int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); + + return getAction(thisServer, thisRegion, otherServer, otherRegion); + } + + protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion, + int toServer, int toRegion) { + if (fromServer < 0 || toServer < 0) { + return BaseLoadBalancer.Cluster.NullAction; + } + if (fromRegion > 0 && toRegion > 0) { + return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion, + toServer, toRegion); + } else if (fromRegion > 0) { + return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer); + } else if (toRegion > 0) { + return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer); + } else { + return BaseLoadBalancer.Cluster.NullAction; + } + } + + /** + * Returns a random iteration order of indexes of an array with size length + */ + List getRandomIterationOrder(int length) { + ArrayList order = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + order.add(i); + } + Collections.shuffle(order); + return order; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index add0f1c70f8..5fb3af74629 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -65,7 +65,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; * RegionServer as the new Primary RegionServer) after a region is recovered. This * should help provide consistent read latencies for the regions even when their * primary region servers die. This provides two - * {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator} + * {@link CandidateGenerator} * */ @InterfaceAudience.Private diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java new file mode 100644 index 00000000000..d60065feeb0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class LoadCandidateGenerator extends CandidateGenerator { + + @Override + BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + cluster.sortServersByRegionCount(); + int thisServer = pickMostLoadedServer(cluster, -1); + int otherServer = pickLeastLoadedServer(cluster, thisServer); + return pickRandomRegions(cluster, thisServer, otherServer); + } + + private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + + int index = 0; + while (servers[index] == null || servers[index] == thisServer) { + index++; + if (index == servers.length) { + return -1; + } + } + return servers[index]; + } + + private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + + int index = servers.length - 1; + while (servers[index] == null || servers[index] == thisServer) { + index--; + if (index < 0) { + return -1; + } + } + return servers[index]; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java new file mode 100644 index 00000000000..6afb86ff9e0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.hbase.master.MasterServices; + +import org.apache.hbase.thirdparty.com.google.common.base.Optional; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class LocalityBasedCandidateGenerator extends CandidateGenerator { + + private MasterServices masterServices; + + LocalityBasedCandidateGenerator(MasterServices masterServices) { + this.masterServices = masterServices; + } + + @Override + BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + if (this.masterServices == null) { + int thisServer = pickRandomServer(cluster); + // Pick the other server + int otherServer = pickOtherRandomServer(cluster, thisServer); + return pickRandomRegions(cluster, thisServer, otherServer); + } + + // Randomly iterate through regions until you find one that is not on ideal host + for (int region : getRandomIterationOrder(cluster.numRegions)) { + int currentServer = cluster.regionIndexToServerIndex[region]; + if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities( + BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) { + Optional potential = tryMoveOrSwap(cluster, + currentServer, region, + cluster.getOrComputeRegionsToMostLocalEntities( + BaseLoadBalancer.Cluster.LocalityType.SERVER)[region] + ); + if (potential.isPresent()) { + return potential.get(); + } + } + } + return BaseLoadBalancer.Cluster.NullAction; + } + + private Optional tryMoveOrSwap(BaseLoadBalancer.Cluster cluster, + int fromServer, int fromRegion, int toServer) { + // Try move first. We know apriori fromRegion has the highest locality on toServer + if (cluster.serverHasTooFewRegions(toServer)) { + return Optional.of(getAction(fromServer, fromRegion, toServer, -1)); + } + // Compare locality gain/loss from swapping fromRegion with regions on toServer + double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer) + - getWeightedLocality(cluster, fromRegion, fromServer); + for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) { + int toRegion = cluster.regionsPerServer[toServer][toRegionIndex]; + double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer) + - getWeightedLocality(cluster, toRegion, toServer); + // If locality would remain neutral or improve, attempt the swap + if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) { + return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion)); + } + } + return Optional.absent(); + } + + private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) { + return cluster.getOrComputeWeightedLocality(region, server, + BaseLoadBalancer.Cluster.LocalityType.SERVER); + } + + void setServices(MasterServices services) { + this.masterServices = services; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java new file mode 100644 index 00000000000..0a878fdf600 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Generates candidates which moves the replicas out of the region server for + * co-hosted region replicas + */ +@InterfaceAudience.Private +class RegionReplicaCandidateGenerator extends CandidateGenerator { + + StochasticLoadBalancer.RandomCandidateGenerator randomGenerator = + new StochasticLoadBalancer.RandomCandidateGenerator(); + + /** + * Randomly select one regionIndex out of all region replicas co-hosted in the same group + * (a group is a server, host or rack) + * + * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, + * primariesOfRegionsPerHost or primariesOfRegionsPerRack + * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack + * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex + * @return a regionIndex for the selected primary or -1 if there is no co-locating + */ + int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup, + int[] regionIndexToPrimaryIndex) { + int currentPrimary = -1; + int currentPrimaryIndex = -1; + int selectedPrimaryIndex = -1; + double currentLargestRandom = -1; + // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region + // ids for the regions hosted in server, a consecutive repetition means that replicas + // are co-hosted + for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { + int primary = j < primariesOfRegionsPerGroup.length + ? primariesOfRegionsPerGroup[j] : -1; + if (primary != currentPrimary) { // check for whether we see a new primary + int numReplicas = j - currentPrimaryIndex; + if (numReplicas > 1) { // means consecutive primaries, indicating co-location + // decide to select this primary region id or not + double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble(); + // we don't know how many region replicas are co-hosted, we will randomly select one + // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) + if (currentRandom > currentLargestRandom) { + selectedPrimaryIndex = currentPrimary; + currentLargestRandom = currentRandom; + } + } + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + + // we have found the primary id for the region to move. Now find the actual regionIndex + // with the given primary, prefer to move the secondary region. + for (int regionIndex : regionsPerGroup) { + if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { + // always move the secondary, not the primary + if (selectedPrimaryIndex != regionIndex) { + return regionIndex; + } + } + } + return -1; + } + + @Override + BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + int serverIndex = pickRandomServer(cluster); + if (cluster.numServers <= 1 || serverIndex == -1) { + return BaseLoadBalancer.Cluster.NullAction; + } + + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerServer[serverIndex], + cluster.regionsPerServer[serverIndex], + cluster.regionIndexToPrimaryIndex); + + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int toServerIndex = pickOtherRandomServer(cluster, serverIndex); + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 66e68d58261..d64789a09a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -21,7 +21,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; @@ -54,7 +53,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Optional; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -641,123 +639,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return total; } - /** Generates a candidate action to be applied to the cluster for cost function search */ - abstract static class CandidateGenerator { - abstract Cluster.Action generate(Cluster cluster); - - /** - * From a list of regions pick a random one. Null can be returned which - * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move - * rather than swap. - * - * @param cluster The state of the cluster - * @param server index of the server - * @param chanceOfNoSwap Chance that this will decide to try a move rather - * than a swap. - * @return a random {@link RegionInfo} or null if an asymmetrical move is - * suggested. - */ - protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { - // Check to see if this is just a move. - if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { - // signal a move only. - return -1; - } - int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); - return cluster.regionsPerServer[server][rand]; - - } - protected int pickRandomServer(Cluster cluster) { - if (cluster.numServers < 1) { - return -1; - } - - return RANDOM.nextInt(cluster.numServers); - } - - protected int pickRandomRack(Cluster cluster) { - if (cluster.numRacks < 1) { - return -1; - } - - return RANDOM.nextInt(cluster.numRacks); - } - - protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { - if (cluster.numServers < 2) { - return -1; - } - while (true) { - int otherServerIndex = pickRandomServer(cluster); - if (otherServerIndex != serverIndex) { - return otherServerIndex; - } - } - } - - protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { - if (cluster.numRacks < 2) { - return -1; - } - while (true) { - int otherRackIndex = pickRandomRack(cluster); - if (otherRackIndex != rackIndex) { - return otherRackIndex; - } - } - } - - protected Cluster.Action pickRandomRegions(Cluster cluster, - int thisServer, - int otherServer) { - if (thisServer < 0 || otherServer < 0) { - return Cluster.NullAction; - } - - // Decide who is most likely to need another region - int thisRegionCount = cluster.getNumRegions(thisServer); - int otherRegionCount = cluster.getNumRegions(otherServer); - - // Assign the chance based upon the above - double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5; - double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5; - - int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); - int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); - - return getAction(thisServer, thisRegion, otherServer, otherRegion); - } - - protected Cluster.Action getAction(int fromServer, int fromRegion, - int toServer, int toRegion) { - if (fromServer < 0 || toServer < 0) { - return Cluster.NullAction; - } - if (fromRegion > 0 && toRegion > 0) { - return new Cluster.SwapRegionsAction(fromServer, fromRegion, - toServer, toRegion); - } else if (fromRegion > 0) { - return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer); - } else if (toRegion > 0) { - return new Cluster.MoveRegionAction(toRegion, toServer, fromServer); - } else { - return Cluster.NullAction; - } - } - - /** - * Returns a random iteration order of indexes of an array with size length - */ - protected List getRandomIterationOrder(int length) { - ArrayList order = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - order.add(i); - } - Collections.shuffle(order); - return order; - } - } - static class RandomCandidateGenerator extends CandidateGenerator { @Override @@ -772,201 +653,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - static class LoadCandidateGenerator extends CandidateGenerator { - - @Override - Cluster.Action generate(Cluster cluster) { - cluster.sortServersByRegionCount(); - int thisServer = pickMostLoadedServer(cluster, -1); - int otherServer = pickLeastLoadedServer(cluster, thisServer); - - return pickRandomRegions(cluster, thisServer, otherServer); - } - - private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { - Integer[] servers = cluster.serverIndicesSortedByRegionCount; - - int index = 0; - while (servers[index] == null || servers[index] == thisServer) { - index++; - if (index == servers.length) { - return -1; - } - } - return servers[index]; - } - - private int pickMostLoadedServer(final Cluster cluster, int thisServer) { - Integer[] servers = cluster.serverIndicesSortedByRegionCount; - - int index = servers.length - 1; - while (servers[index] == null || servers[index] == thisServer) { - index--; - if (index < 0) { - return -1; - } - } - return servers[index]; - } - } - - static class LocalityBasedCandidateGenerator extends CandidateGenerator { - - private MasterServices masterServices; - - LocalityBasedCandidateGenerator(MasterServices masterServices) { - this.masterServices = masterServices; - } - - @Override - Cluster.Action generate(Cluster cluster) { - if (this.masterServices == null) { - int thisServer = pickRandomServer(cluster); - // Pick the other server - int otherServer = pickOtherRandomServer(cluster, thisServer); - return pickRandomRegions(cluster, thisServer, otherServer); - } - - // Randomly iterate through regions until you find one that is not on ideal host - for (int region : getRandomIterationOrder(cluster.numRegions)) { - int currentServer = cluster.regionIndexToServerIndex[region]; - if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) { - Optional potential = tryMoveOrSwap( - cluster, - currentServer, - region, - cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region] - ); - if (potential.isPresent()) { - return potential.get(); - } - } - } - return Cluster.NullAction; - } - - /** - * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved. - * Returns empty optional if no move can be found - */ - private Optional tryMoveOrSwap(Cluster cluster, - int fromServer, - int fromRegion, - int toServer) { - // Try move first. We know apriori fromRegion has the highest locality on toServer - if (cluster.serverHasTooFewRegions(toServer)) { - return Optional.of(getAction(fromServer, fromRegion, toServer, -1)); - } - - // Compare locality gain/loss from swapping fromRegion with regions on toServer - double fromRegionLocalityDelta = - getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer); - for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) { - int toRegion = cluster.regionsPerServer[toServer][toRegionIndex]; - double toRegionLocalityDelta = - getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer); - // If locality would remain neutral or improve, attempt the swap - if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) { - return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion)); - } - } - - return Optional.absent(); - } - - private double getWeightedLocality(Cluster cluster, int region, int server) { - return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER); - } - - void setServices(MasterServices services) { - this.masterServices = services; - } - } - - /** - * Generates candidates which moves the replicas out of the region server for - * co-hosted region replicas - */ - static class RegionReplicaCandidateGenerator extends CandidateGenerator { - - RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); - - /** - * Randomly select one regionIndex out of all region replicas co-hosted in the same group - * (a group is a server, host or rack) - * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, - * primariesOfRegionsPerHost or primariesOfRegionsPerRack - * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack - * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex - * @return a regionIndex for the selected primary or -1 if there is no co-locating - */ - int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup - , int[] regionIndexToPrimaryIndex) { - int currentPrimary = -1; - int currentPrimaryIndex = -1; - int selectedPrimaryIndex = -1; - double currentLargestRandom = -1; - // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region - // ids for the regions hosted in server, a consecutive repetition means that replicas - // are co-hosted - for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { - int primary = j < primariesOfRegionsPerGroup.length - ? primariesOfRegionsPerGroup[j] : -1; - if (primary != currentPrimary) { // check for whether we see a new primary - int numReplicas = j - currentPrimaryIndex; - if (numReplicas > 1) { // means consecutive primaries, indicating co-location - // decide to select this primary region id or not - double currentRandom = RANDOM.nextDouble(); - // we don't know how many region replicas are co-hosted, we will randomly select one - // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) - if (currentRandom > currentLargestRandom) { - selectedPrimaryIndex = currentPrimary; - currentLargestRandom = currentRandom; - } - } - currentPrimary = primary; - currentPrimaryIndex = j; - } - } - - // we have found the primary id for the region to move. Now find the actual regionIndex - // with the given primary, prefer to move the secondary region. - for (int j = 0; j < regionsPerGroup.length; j++) { - int regionIndex = regionsPerGroup[j]; - if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { - // always move the secondary, not the primary - if (selectedPrimaryIndex != regionIndex) { - return regionIndex; - } - } - } - return -1; - } - - @Override - Cluster.Action generate(Cluster cluster) { - int serverIndex = pickRandomServer(cluster); - if (cluster.numServers <= 1 || serverIndex == -1) { - return Cluster.NullAction; - } - - int regionIndex = selectCoHostedRegionPerGroup( - cluster.primariesOfRegionsPerServer[serverIndex], - cluster.regionsPerServer[serverIndex], - cluster.regionIndexToPrimaryIndex); - - // if there are no pairs of region replicas co-hosted, default to random generator - if (regionIndex == -1) { - // default to randompicker - return randomGenerator.generate(cluster); - } - - int toServerIndex = pickOtherRandomServer(cluster, serverIndex); - int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); - return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); - } - } - /** * Generates candidates which moves the replicas out of the rack for * co-hosted region replicas in the same rack