From 4ebecd3eba5d0fa06e9cf39f11334cdd2dfd83c9 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 30 May 2012 20:53:15 +0000 Subject: [PATCH] HBASE-5959 Add other load balancers git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344457 13f79535-47bb-0310-9956-ffa450edef68 --- hbase-server/pom.xml | 4 + .../apache/hadoop/hbase/master/HMaster.java | 3 + .../hadoop/hbase/master/LoadBalancer.java | 6 +- .../hadoop/hbase/master/RegionPlan.java | 16 + .../master/balancer/BaseLoadBalancer.java | 253 +++++++ .../master/balancer/ClusterLoadState.java | 86 +++ .../{ => balancer}/DefaultLoadBalancer.java | 433 ++--------- .../{ => balancer}/LoadBalancerFactory.java | 12 +- .../master/balancer/RegionInfoComparator.java | 38 + .../master/balancer/RegionLocationFinder.java | 194 +++++ .../master/{ => balancer}/ServerAndLoad.java | 15 +- .../balancer/StochasticLoadBalancer.java | 696 ++++++++++++++++++ .../hadoop/hbase/TestRegionRebalancing.java | 42 +- .../hbase/master/TestAssignmentManager.java | 8 +- .../hbase/master/TestDefaultLoadBalancer.java | 513 ------------- .../master/balancer/BalancerTestBase.java | 223 ++++++ .../master/balancer/TestBaseLoadBalancer.java | 231 ++++++ .../balancer/TestDefaultLoadBalancer.java | 133 ++++ .../balancer/TestStochasticLoadBalancer.java | 183 +++++ pom.xml | 11 +- 20 files changed, 2169 insertions(+), 931 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/{ => balancer}/DefaultLoadBalancer.java (54%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/{ => balancer}/LoadBalancerFactory.java (83%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/{ => balancer}/ServerAndLoad.java (77%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 17a04892f28..8b41144e77c 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -337,6 +337,10 @@ commons-logging commons-logging + + org.apache.commons + commons-math + log4j log4j diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e40d787ee01..9bb5d0ad3c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.CreateTableHandler; import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; @@ -1135,6 +1136,8 @@ Server { this.assignmentManager.getAssignmentsByTable(); List plans = new ArrayList(); + //Give the balancer the current cluster state. + this.balancer.setClusterStatus(getClusterStatus()); for (Map> assignments : assignmentsByTable.values()) { List partialPlans = this.balancer.balanceCluster(assignments); if (partialPlans != null) plans.addAll(partialPlans); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 135b94da9bd..5ef7df83fb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -19,15 +19,15 @@ */ package org.apache.hadoop.hbase.master; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; -import java.util.List; -import java.util.Map; - /** * Makes decisions about the placement and movement of Regions across * RegionServers. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java index 6fbf9ab65c6..6d85dc81be9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import java.io.Serializable; +import java.util.Comparator; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -37,6 +40,19 @@ public class RegionPlan implements Comparable { private final ServerName source; private ServerName dest; + public static class RegionPlanComparator implements Comparator, Serializable { + + private static final long serialVersionUID = 4213207330485734853L; + + @Override + public int compare(RegionPlan l, RegionPlan r) { + long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } + } + /** * Instantiate a plan for a region move, moving the specified region from * the specified source server to the specified destination server. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java new file mode 100644 index 00000000000..e40322fb90f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterServices; + +import com.google.common.base.Joiner; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Sets; + +/** + * The base class for load balancers. It provides the the functions used to by + * {@link AssignmentManager} to assign regions in the edge cases. It doesn't + * provide an implementation of the actual balancing algorithm. + * + */ +public abstract class BaseLoadBalancer implements LoadBalancer { + + // slop for regions + private float slop; + private Configuration config; + private static final Random RANDOM = new Random(System.currentTimeMillis()); + private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); + + protected MasterServices services; + + @Override + public void setConf(Configuration conf) { + this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); + if (slop < 0) slop = 0; + else if (slop > 1) slop = 1; + this.config = conf; + } + + @Override + public Configuration getConf() { + return this.config; + } + + public void setClusterStatus(ClusterStatus st) { + // Not used except for the StocasticBalancer + } + + public void setMasterServices(MasterServices masterServices) { + this.services = masterServices; + } + + protected boolean needsBalance(ClusterLoadState cs) { + // Check if we even need to do any load balancing + float average = cs.getLoadAverage(); // for logging + // HBASE-3681 check sloppiness first + int floor = (int) Math.floor(average * (1 - slop)); + int ceiling = (int) Math.ceil(average * (1 + slop)); + + return cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor; + } + + /** + * Generates a bulk assignment plan to be used on cluster startup using a + * simple round-robin assignment. + *

+ * Takes a list of all the regions and all the servers in the cluster and + * returns a map of each server to the regions that it should be assigned. + *

+ * Currently implemented as a round-robin assignment. Same invariant as load + * balancing, all servers holding floor(avg) or ceiling(avg). + * + * TODO: Use block locations from HDFS to place regions with their blocks + * + * @param regions all regions + * @param servers all servers + * @return map of server to the regions it should take, or null if no + * assignment is possible (ie. no regions or no servers) + */ + public Map> roundRobinAssignment(List regions, + List servers) { + if (regions.isEmpty() || servers.isEmpty()) { + return null; + } + Map> assignments = new TreeMap>(); + int numRegions = regions.size(); + int numServers = servers.size(); + int max = (int) Math.ceil((float) numRegions / numServers); + int serverIdx = 0; + if (numServers > 1) { + serverIdx = RANDOM.nextInt(numServers); + } + int regionIdx = 0; + for (int j = 0; j < numServers; j++) { + ServerName server = servers.get((j + serverIdx) % numServers); + List serverRegions = new ArrayList(max); + for (int i = regionIdx; i < numRegions; i += numServers) { + serverRegions.add(regions.get(i % numRegions)); + } + assignments.put(server, serverRegions); + regionIdx++; + } + return assignments; + } + + /** + * Generates an immediate assignment plan to be used by a new master for + * regions in transition that do not have an already known destination. + * + * Takes a list of regions that need immediate assignment and a list of all + * available servers. Returns a map of regions to the server they should be + * assigned to. + * + * This method will return quickly and does not do any intelligent balancing. + * The goal is to make a fast decision not the best decision possible. + * + * Currently this is random. + * + * @param regions + * @param servers + * @return map of regions to the server it should be assigned to + */ + public Map immediateAssignment(List regions, + List servers) { + Map assignments = new TreeMap(); + for (HRegionInfo region : regions) { + assignments.put(region, randomAssignment(region, servers)); + } + return assignments; + } + + /** + * Used to assign a single region to a random server. + */ + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { + if (servers == null || servers.isEmpty()) { + LOG.warn("Wanted to do random assignment but no servers to assign to"); + return null; + } + return servers.get(RANDOM.nextInt(servers.size())); + } + + /** + * Generates a bulk assignment startup plan, attempting to reuse the existing + * assignment information from META, but adjusting for the specified list of + * available/online servers available for assignment. + *

+ * Takes a map of all regions to their existing assignment from META. Also + * takes a list of online servers for regions to be assigned to. Attempts to + * retain all assignment, so in some instances initial assignment will not be + * completely balanced. + *

+ * Any leftover regions without an existing server to be assigned to will be + * assigned randomly to available servers. + * + * @param regions regions and existing assignment from meta + * @param servers available servers + * @return map of servers and regions to be assigned to them + */ + public Map> retainAssignment(Map regions, + List servers) { + // Group all of the old assignments by their hostname. + // We can't group directly by ServerName since the servers all have + // new start-codes. + + // Group the servers by their hostname. It's possible we have multiple + // servers on the same host on different ports. + ArrayListMultimap serversByHostname = ArrayListMultimap.create(); + for (ServerName server : servers) { + serversByHostname.put(server.getHostname(), server); + } + + // Now come up with new assignments + Map> assignments = new TreeMap>(); + + for (ServerName server : servers) { + assignments.put(server, new ArrayList()); + } + + // Collection of the hostnames that used to have regions + // assigned, but for which we no longer have any RS running + // after the cluster restart. + Set oldHostsNoLongerPresent = Sets.newTreeSet(); + + int numRandomAssignments = 0; + int numRetainedAssigments = 0; + for (Map.Entry entry : regions.entrySet()) { + HRegionInfo region = entry.getKey(); + ServerName oldServerName = entry.getValue(); + List localServers = new ArrayList(); + if (oldServerName != null) { + localServers = serversByHostname.get(oldServerName.getHostname()); + } + if (localServers.isEmpty()) { + // No servers on the new cluster match up with this hostname, + // assign randomly. + ServerName randomServer = servers.get(RANDOM.nextInt(servers.size())); + assignments.get(randomServer).add(region); + numRandomAssignments++; + if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname()); + } else if (localServers.size() == 1) { + // the usual case - one new server on same host + assignments.get(localServers.get(0)).add(region); + numRetainedAssigments++; + } else { + // multiple new servers in the cluster on this same host + int size = localServers.size(); + ServerName target = localServers.get(RANDOM.nextInt(size)); + assignments.get(target).add(region); + numRetainedAssigments++; + } + } + + String randomAssignMsg = ""; + if (numRandomAssignments > 0) { + randomAssignMsg = + numRandomAssignments + " regions were assigned " + + "to random hosts, since the old hosts for these regions are no " + + "longer present in the cluster. These hosts were:\n " + + Joiner.on("\n ").join(oldHostsNoLongerPresent); + } + + LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments + + " retained the pre-restart assignment. " + randomAssignMsg); + return assignments; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java new file mode 100644 index 00000000000..9bf4b59b49e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * Class used to hold the current state of the cluster and how balanced it is. + */ +public class ClusterLoadState { + private final Map> clusterState; + private final NavigableMap> serversByLoad; + private boolean emptyRegionServerPresent = false; + private int numRegions = 0; + private int numServers = 0; + + public ClusterLoadState(Map> clusterState) { + super(); + this.numRegions = 0; + this.numServers = clusterState.size(); + this.clusterState = clusterState; + serversByLoad = new TreeMap>(); + // Iterate so we can count regions as we build the map + for (Map.Entry> server : clusterState.entrySet()) { + List regions = server.getValue(); + int sz = regions.size(); + if (sz == 0) emptyRegionServerPresent = true; + numRegions += sz; + serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); + } + } + + Map> getClusterState() { + return clusterState; + } + + NavigableMap> getServersByLoad() { + return serversByLoad; + } + + boolean isEmptyRegionServerPresent() { + return emptyRegionServerPresent; + } + + int getNumRegions() { + return numRegions; + } + + int getNumServers() { + return numServers; + } + + float getLoadAverage() { + return (float) numRegions / numServers; + } + + int getMinLoad() { + return getServersByLoad().lastKey().getLoad(); + } + + int getMaxLoad() { + return getServersByLoad().firstKey().getLoad(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java similarity index 54% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java index 7071b4d444b..b68901e8023 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,41 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.master; +package org.apache.hadoop.hbase.master.balancer; -import java.io.FileNotFoundException; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Random; -import java.util.Set; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.RegionPlan; -import com.google.common.base.Joiner; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; /** * Makes decisions about the placement and movement of Regions across @@ -70,69 +54,45 @@ import com.google.common.collect.Sets; *

This classes produces plans for the {@link AssignmentManager} to execute. */ @InterfaceAudience.Private -public class DefaultLoadBalancer implements LoadBalancer { - private static final Log LOG = LogFactory.getLog(LoadBalancer.class); +public class DefaultLoadBalancer extends BaseLoadBalancer { + private static final Log LOG = LogFactory.getLog(DefaultLoadBalancer.class); private static final Random RANDOM = new Random(System.currentTimeMillis()); - // slop for regions - private float slop; - private Configuration config; - private ClusterStatus status; - private MasterServices services; - public void setClusterStatus(ClusterStatus st) { - this.status = st; - } - - public void setMasterServices(MasterServices masterServices) { - this.services = masterServices; - } - - @Override - public void setConf(Configuration conf) { - this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); - if (slop < 0) slop = 0; - else if (slop > 1) slop = 1; - this.config = conf; - } - - @Override - public Configuration getConf() { - return this.config; - } - - /* - * The following comparator assumes that RegionId from HRegionInfo can - * represent the age of the region - larger RegionId means the region - * is younger. - * This comparator is used in balanceCluster() to account for the out-of-band - * regions which were assigned to the server after some other region server - * crashed. - */ - private static class RegionInfoComparator implements Comparator { - @Override - public int compare(HRegionInfo l, HRegionInfo r) { - long diff = r.getRegionId() - l.getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; - } - } + private RegionInfoComparator riComparator = new RegionInfoComparator(); + private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); - RegionInfoComparator riComparator = new RegionInfoComparator(); - - private static class RegionPlanComparator implements Comparator { - @Override - public int compare(RegionPlan l, RegionPlan r) { - long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; + /** + * Stores additional per-server information about the regions added/removed + * during the run of the balancing algorithm. + * + * For servers that shed regions, we need to track which regions we have already + * shed. nextRegionForUnload contains the index in the list of regions on + * the server that is the next to be shed. + */ + static class BalanceInfo { + + private final int nextRegionForUnload; + private int numRegionsAdded; + + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + this.nextRegionForUnload = nextRegionForUnload; + this.numRegionsAdded = numRegionsAdded; + } + + int getNextRegionForUnload() { + return nextRegionForUnload; + } + + int getNumRegionsAdded() { + return numRegionsAdded; + } + + void setNumRegionsAdded(int numAdded) { + this.numRegionsAdded = numAdded; } } - RegionPlanComparator rpComparator = new RegionPlanComparator(); - /** * Generate a global load balancing plan according to the specified map of * server information to the most loaded regions of each server. @@ -219,34 +179,25 @@ public class DefaultLoadBalancer implements LoadBalancer { * or null if cluster is already balanced */ public List balanceCluster( - Map> clusterState) { + Map> clusterMap) { boolean emptyRegionServerPresent = false; long startTime = System.currentTimeMillis(); - int numServers = clusterState.size(); + + ClusterLoadState cs = new ClusterLoadState(clusterMap); + + int numServers = cs.getNumServers(); if (numServers == 0) { LOG.debug("numServers=0 so skipping load balancing"); return null; } - NavigableMap> serversByLoad = - new TreeMap>(); - int numRegions = 0; - // Iterate so we can count regions as we build the map - for (Map.Entry> server: clusterState.entrySet()) { - List regions = server.getValue(); - int sz = regions.size(); - if (sz == 0) emptyRegionServerPresent = true; - numRegions += sz; - serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); - } - // Check if we even need to do any load balancing - float average = (float)numRegions / numServers; // for logging - // HBASE-3681 check sloppiness first - int floor = (int) Math.floor(average * (1 - slop)); - int ceiling = (int) Math.ceil(average * (1 + slop)); - if (serversByLoad.lastKey().getLoad() <= ceiling && - serversByLoad.firstKey().getLoad() >= floor) { + NavigableMap> serversByLoad = cs.getServersByLoad(); + + int numRegions = cs.getNumRegions(); + + if (!this.needsBalance(cs)) { // Skipped because no server outside (min,max) range + float average = cs.getLoadAverage(); // for logging LOG.info("Skipping load balancing because balanced cluster; " + "servers=" + numServers + " " + "regions=" + numRegions + " average=" + average + " " + @@ -254,6 +205,7 @@ public class DefaultLoadBalancer implements LoadBalancer { " leastloaded=" + serversByLoad.firstKey().getLoad()); return null; } + int min = numRegions / numServers; int max = numRegions % numServers == 0 ? min : min + 1; @@ -452,7 +404,7 @@ public class DefaultLoadBalancer implements LoadBalancer { ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded); StringBuilder sb = new StringBuilder(); - for (Map.Entry> e: clusterState.entrySet()) { + for (Map.Entry> e: clusterMap.entrySet()) { if (sb.length() > 0) sb.append(", "); sb.append(e.getKey().toString()); sb.append(" "); @@ -473,7 +425,7 @@ public class DefaultLoadBalancer implements LoadBalancer { /** * Add a region from the head or tail to the List of regions to return. */ - void addRegionPlan(final MinMaxPriorityQueue regionsToMove, + private void addRegionPlan(final MinMaxPriorityQueue regionsToMove, final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { RegionPlan rp = null; if (!fetchFromTail) rp = regionsToMove.remove(); @@ -481,291 +433,4 @@ public class DefaultLoadBalancer implements LoadBalancer { rp.setDestination(sn); regionsToReturn.add(rp); } - - /** - * Stores additional per-server information about the regions added/removed - * during the run of the balancing algorithm. - * - * For servers that shed regions, we need to track which regions we have - * already shed. nextRegionForUnload contains the index in the list - * of regions on the server that is the next to be shed. - */ - private static class BalanceInfo { - - private final int nextRegionForUnload; - private int numRegionsAdded; - - public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { - this.nextRegionForUnload = nextRegionForUnload; - this.numRegionsAdded = numRegionsAdded; - } - - public int getNextRegionForUnload() { - return nextRegionForUnload; - } - - public int getNumRegionsAdded() { - return numRegionsAdded; - } - - public void setNumRegionsAdded(int numAdded) { - this.numRegionsAdded = numAdded; - } - } - - /** - * Generates a bulk assignment plan to be used on cluster startup using a - * simple round-robin assignment. - *

- * Takes a list of all the regions and all the servers in the cluster and - * returns a map of each server to the regions that it should be assigned. - *

- * Currently implemented as a round-robin assignment. Same invariant as - * load balancing, all servers holding floor(avg) or ceiling(avg). - * - * TODO: Use block locations from HDFS to place regions with their blocks - * - * @param regions all regions - * @param servers all servers - * @return map of server to the regions it should take, or null if no - * assignment is possible (ie. no regions or no servers) - */ - public Map> roundRobinAssignment( - List regions, List servers) { - if (regions.isEmpty() || servers.isEmpty()) { - return null; - } - Map> assignments = - new TreeMap>(); - int numRegions = regions.size(); - int numServers = servers.size(); - int max = (int)Math.ceil((float)numRegions/numServers); - int serverIdx = 0; - if (numServers > 1) { - serverIdx = RANDOM.nextInt(numServers); - } - int regionIdx = 0; - for (int j = 0; j < numServers; j++) { - ServerName server = servers.get((j + serverIdx) % numServers); - List serverRegions = new ArrayList(max); - for (int i=regionIdx; i - * Takes a map of all regions to their existing assignment from META. Also - * takes a list of online servers for regions to be assigned to. Attempts to - * retain all assignment, so in some instances initial assignment will not be - * completely balanced. - *

- * Any leftover regions without an existing server to be assigned to will be - * assigned randomly to available servers. - * @param regions regions and existing assignment from meta - * @param servers available servers - * @return map of servers and regions to be assigned to them - */ - public Map> retainAssignment( - Map regions, List servers) { - // Group all of the old assignments by their hostname. - // We can't group directly by ServerName since the servers all have - // new start-codes. - - // Group the servers by their hostname. It's possible we have multiple - // servers on the same host on different ports. - ArrayListMultimap serversByHostname = - ArrayListMultimap.create(); - for (ServerName server : servers) { - serversByHostname.put(server.getHostname(), server); - } - - // Now come up with new assignments - Map> assignments = - new TreeMap>(); - - for (ServerName server : servers) { - assignments.put(server, new ArrayList()); - } - - // Collection of the hostnames that used to have regions - // assigned, but for which we no longer have any RS running - // after the cluster restart. - Set oldHostsNoLongerPresent = Sets.newTreeSet(); - - int numRandomAssignments = 0; - int numRetainedAssigments = 0; - for (Map.Entry entry : regions.entrySet()) { - HRegionInfo region = entry.getKey(); - ServerName oldServerName = entry.getValue(); - List localServers = new ArrayList(); - if (oldServerName != null) { - localServers = serversByHostname.get(oldServerName.getHostname()); - } - if (localServers.isEmpty()) { - // No servers on the new cluster match up with this hostname, - // assign randomly. - ServerName randomServer = servers.get(RANDOM.nextInt(servers.size())); - assignments.get(randomServer).add(region); - numRandomAssignments++; - if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname()); - } else if (localServers.size() == 1) { - // the usual case - one new server on same host - assignments.get(localServers.get(0)).add(region); - numRetainedAssigments++; - } else { - // multiple new servers in the cluster on this same host - int size = localServers.size(); - ServerName target = localServers.get(RANDOM.nextInt(size)); - assignments.get(target).add(region); - numRetainedAssigments++; - } - } - - String randomAssignMsg = ""; - if (numRandomAssignments > 0) { - randomAssignMsg = numRandomAssignments + " regions were assigned " + - "to random hosts, since the old hosts for these regions are no " + - "longer present in the cluster. These hosts were:\n " + - Joiner.on("\n ").join(oldHostsNoLongerPresent); - } - - LOG.info("Reassigned " + regions.size() + " regions. " + - numRetainedAssigments + " retained the pre-restart assignment. " + - randomAssignMsg); - return assignments; - } - - /** - * Returns an ordered list of hosts that are hosting the blocks for this - * region. The weight of each host is the sum of the block lengths of all - * files on that host, so the first host in the list is the server which - * holds the most bytes of the given region's HFiles. - * - * @param fs the filesystem - * @param region region - * @return ordered list of hosts holding blocks of the specified region - */ - @SuppressWarnings("unused") - private List getTopBlockLocations(FileSystem fs, - HRegionInfo region) { - List topServerNames = null; - try { - HTableDescriptor tableDescriptor = getTableDescriptor( - region.getTableName()); - if (tableDescriptor != null) { - HDFSBlocksDistribution blocksDistribution = - HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, - region.getEncodedName()); - List topHosts = blocksDistribution.getTopHosts(); - topServerNames = mapHostNameToServerName(topHosts); - } - } catch (IOException ioe) { - LOG.debug("IOException during HDFSBlocksDistribution computation. for " + - "region = " + region.getEncodedName() , ioe); - } - - return topServerNames; - } - - /** - * return HTableDescriptor for a given tableName - * @param tableName the table name - * @return HTableDescriptor - * @throws IOException - */ - private HTableDescriptor getTableDescriptor(byte[] tableName) - throws IOException { - HTableDescriptor tableDescriptor = null; - try { - if ( this.services != null) - { - tableDescriptor = this.services.getTableDescriptors(). - get(Bytes.toString(tableName)); - } - } catch (FileNotFoundException fnfe) { - LOG.debug("FileNotFoundException during getTableDescriptors." - + " Current table name = " + Bytes.toStringBinary(tableName), fnfe); - } - - return tableDescriptor; - } - - /** - * Map hostname to ServerName, The output ServerName list will have the same - * order as input hosts. - * @param hosts the list of hosts - * @return ServerName list - */ - private List mapHostNameToServerName(List hosts) { - if ( hosts == null || status == null) { - return null; - } - - List topServerNames = new ArrayList(); - Collection regionServers = status.getServers(); - - // create a mapping from hostname to ServerName for fast lookup - HashMap hostToServerName = - new HashMap(); - for (ServerName sn : regionServers) { - hostToServerName.put(sn.getHostname(), sn); - } - - for (String host : hosts ) { - ServerName sn = hostToServerName.get(host); - // it is possible that HDFS is up ( thus host is valid ), - // but RS is down ( thus sn is null ) - if (sn != null) { - topServerNames.add(sn); - } - } - return topServerNames; - } - - - /** - * Generates an immediate assignment plan to be used by a new master for - * regions in transition that do not have an already known destination. - * - * Takes a list of regions that need immediate assignment and a list of - * all available servers. Returns a map of regions to the server they - * should be assigned to. - * - * This method will return quickly and does not do any intelligent - * balancing. The goal is to make a fast decision not the best decision - * possible. - * - * Currently this is random. - * - * @param regions - * @param servers - * @return map of regions to the server it should be assigned to - */ - public Map immediateAssignment( - List regions, List servers) { - Map assignments = - new TreeMap(); - for(HRegionInfo region : regions) { - assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); - } - return assignments; - } - - public ServerName randomAssignment(HRegionInfo regionInfo, - List servers) { - if (servers == null || servers.isEmpty()) { - LOG.warn("Wanted to do random assignment but no servers to assign to"); - return null; - } - return servers.get(RANDOM.nextInt(servers.size())); - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java similarity index 83% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java index de6f1fb6fcc..68a0887ff83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.hadoop.hbase.master; +package org.apache.hadoop.hbase.master.balancer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.util.ReflectionUtils; /** @@ -39,9 +37,9 @@ public class LoadBalancerFactory { public static LoadBalancer getLoadBalancer(Configuration conf) { // Create the balancer - Class balancerKlass = conf.getClass( - HConstants.HBASE_MASTER_LOADBALANCER_CLASS, - DefaultLoadBalancer.class, LoadBalancer.class); + Class balancerKlass = + conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class, + LoadBalancer.class); return ReflectionUtils.newInstance(balancerKlass, conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java new file mode 100644 index 00000000000..51d8c887cc7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.Comparator; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * The following comparator assumes that RegionId from HRegionInfo can represent + * the age of the region - larger RegionId means the region is younger. This + * comparator is used in balanceCluster() to account for the out-of-band regions + * which were assigned to the server after some other region server crashed. + */ +class RegionInfoComparator implements Comparator { + @Override + public int compare(HRegionInfo l, HRegionInfo r) { + long diff = r.getRegionId() - l.getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java new file mode 100644 index 00000000000..00b48f99edf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +/** + * This will find where data for a region is located in HDFS. It ranks + * {@link ServerName}'s by the size of the store files they are holding for a + * given region. + * + */ +class RegionLocationFinder { + + private static Log LOG = LogFactory.getLog(RegionLocationFinder.class); + + private Configuration conf; + private ClusterStatus status; + private MasterServices services; + + private CacheLoader> loader = + new CacheLoader>() { + + @Override + public List load(HRegionInfo key) throws Exception { + List servers = internalGetTopBlockLocation(key); + if (servers == null) { + return new LinkedList(); + } + return servers; + } + }; + + // The cache for where regions are located. + private LoadingCache> cache = null; + + /** + * Create a cache for region to list of servers + * @param mins Number of mins to cache + * @return A new Cache. + */ + private LoadingCache> createCache(int mins) { + return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + cache = createCache(conf.getInt("hbase.master.balancer.regionLocationCacheTime", 30)); + } + + public void setServices(MasterServices services) { + this.services = services; + } + + public void setClusterStatus(ClusterStatus status) { + this.status = status; + } + + protected List getTopBlockLocations(HRegionInfo region) { + List servers = null; + try { + servers = cache.get(region); + } catch (ExecutionException ex) { + servers = new LinkedList(); + } + return servers; + + } + + /** + * Returns an ordered list of hosts that are hosting the blocks for this + * region. The weight of each host is the sum of the block lengths of all + * files on that host, so the first host in the list is the server which holds + * the most bytes of the given region's HFiles. + * + * @param fs the filesystem + * @param region region + * @return ordered list of hosts holding blocks of the specified region + */ + protected List internalGetTopBlockLocation(HRegionInfo region) { + List topServerNames = null; + try { + HTableDescriptor tableDescriptor = getTableDescriptor(region.getTableName()); + if (tableDescriptor != null) { + HDFSBlocksDistribution blocksDistribution = + HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, + region.getEncodedName()); + List topHosts = blocksDistribution.getTopHosts(); + topServerNames = mapHostNameToServerName(topHosts); + } + } catch (IOException ioe) { + LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = " + + region.getEncodedName(), ioe); + } + + return topServerNames; + } + + /** + * return HTableDescriptor for a given tableName + * + * @param tableName the table name + * @return HTableDescriptor + * @throws IOException + */ + protected HTableDescriptor getTableDescriptor(byte[] tableName) throws IOException { + HTableDescriptor tableDescriptor = null; + try { + if (this.services != null) { + tableDescriptor = this.services.getTableDescriptors().get(Bytes.toString(tableName)); + } + } catch (FileNotFoundException fnfe) { + LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = " + + Bytes.toStringBinary(tableName), fnfe); + } + + return tableDescriptor; + } + + /** + * Map hostname to ServerName, The output ServerName list will have the same + * order as input hosts. + * + * @param hosts the list of hosts + * @return ServerName list + */ + protected List mapHostNameToServerName(List hosts) { + if (hosts == null || status == null) { + return null; + } + + List topServerNames = new ArrayList(); + Collection regionServers = status.getServers(); + + // create a mapping from hostname to ServerName for fast lookup + HashMap hostToServerName = new HashMap(); + for (ServerName sn : regionServers) { + hostToServerName.put(sn.getHostname(), sn); + } + + for (String host : hosts) { + ServerName sn = hostToServerName.get(host); + // it is possible that HDFS is up ( thus host is valid ), + // but RS is down ( thus sn is null ) + if (sn != null) { + topServerNames.add(sn); + } + } + return topServerNames; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java similarity index 77% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java index 8c7dadf0010..93298d8faf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java @@ -15,8 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.master; +package org.apache.hadoop.hbase.master.balancer; +import java.io.Serializable; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.ServerName; @@ -25,7 +26,8 @@ import org.apache.hadoop.hbase.ServerName; * Data structure that holds servername and 'load'. */ @InterfaceAudience.Private -class ServerAndLoad implements Comparable { +class ServerAndLoad implements Comparable, Serializable { + private static final long serialVersionUID = 2735470854607296965L; private final ServerName sn; private final int load; @@ -47,4 +49,13 @@ class ServerAndLoad implements Comparable { int diff = this.load - other.load; return diff != 0 ? diff : this.sn.compareTo(other.getServerName()); } + + @Override + public boolean equals(Object o) { + if (o instanceof ServerAndLoad) { + ServerAndLoad sl = (ServerAndLoad) o; + return this.compareTo(sl) == 0; + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java new file mode 100644 index 00000000000..660cd900d28 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -0,0 +1,696 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.HServerLoad.RegionLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +/** + *

This is a best effort load balancer. Given a Cost function F(C) => x It will + * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the + * new cluster state becomes the plan. It includes costs functions to compute the cost of:

+ *
    + *
  • Region Load
  • + *
  • Table Load
  • + *
  • Data Locality
  • + *
  • Memstore Sizes
  • + *
  • Storefile Sizes
  • + *
+ * + * + *

Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost + * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are + * scaled by their respective multipliers:

+ * + *
    + *
  • hbase.master.balancer.stochastic.regionLoadCost
  • + *
  • hbase.master.balancer.stochastic.moveCost
  • + *
  • hbase.master.balancer.stochastic.tableLoadCost
  • + *
  • hbase.master.balancer.stochastic.localityCost
  • + *
  • hbase.master.balancer.stochastic.memstoreSizeCost
  • + *
  • hbase.master.balancer.stochastic.storefileSizeCost
  • + *
+ * + *

In addition to the above configurations, the balancer can be tuned by the following + * configuration values:

+ *
    + *
  • hbase.master.balancer.stochastic.maxMoveRegions which + * controls what the max number of regions that can be moved in a single invocation of this + * balancer.
  • + *
  • hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of + * regions is multiplied to try and get the number of times the balancer will + * mutate all servers.
  • + *
  • hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that + * the balancer will try and mutate all the servers. The balancer will use the minimum of this + * value and the above computation.
  • + *
+ * + *

This balancer is best used with hbase.master.loadbalance.bytable set to false + * so that the balancer gets the full picture of all loads on the cluster.

+ */ +@InterfaceAudience.Private +public class StochasticLoadBalancer extends BaseLoadBalancer { + + private static final String STOREFILE_SIZE_COST_KEY = + "hbase.master.balancer.stochastic.storefileSizeCost"; + private static final String MEMSTORE_SIZE_COST_KEY = + "hbase.master.balancer.stochastic.memstoreSizeCost"; + private static final String WRITE_REQUEST_COST_KEY = + "hbase.master.balancer.stochastic.writeRequestCost"; + private static final String READ_REQUEST_COST_KEY = + "hbase.master.balancer.stochastic.readRequestCost"; + private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; + private static final String TABLE_LOAD_COST_KEY = + "hbase.master.balancer.stochastic.tableLoadCost"; + private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; + private static final String REGION_LOAD_COST_KEY = + "hbase.master.balancer.stochastic.regionLoadCost"; + private static final String STEPS_PER_REGION_KEY = + "hbase.master.balancer.stochastic.stepsPerRegion"; + private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; + private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions"; + + private static final Random RANDOM = new Random(System.currentTimeMillis()); + private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); + private final RegionLocationFinder regionFinder = new RegionLocationFinder(); + private ClusterStatus clusterStatus = null; + private Map loads = new HashMap(); + + // values are defaults + private int maxSteps = 15000; + private int stepsPerRegion = 110; + private int maxMoves = 600; + private float loadMultiplier = 55; + private float moveCostMultiplier = 5; + private float tableMultiplier = 5; + private float localityMultiplier = 5; + private float readRequestMultiplier = 0; + private float writeRequestMultiplier = 0; + private float memStoreSizeMultiplier = 5; + private float storeFileSizeMultiplier = 5; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + regionFinder.setConf(conf); + + maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); + maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves); + stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); + + // Load multiplier should be the greatest as it is the most general way to balance data. + loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier); + + // Move cost multiplier should be the same cost or higer than the rest of the costs to ensure + // that two costs must get better to justify a move cost. + moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier); + + // These are the added costs so that the stochastic load balancer can get a little bit smarter + // about where to move regions. + tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier); + localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier); + memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier); + storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier); + + // These are not used currently. + // TODO: Start using these once rolling averages are implemented for read/write load. + readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier); + writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier); + } + + @Override + public void setClusterStatus(ClusterStatus st) { + super.setClusterStatus(st); + regionFinder.setClusterStatus(st); + this.clusterStatus = st; + updateRegionLoad(); + } + + @Override + public void setMasterServices(MasterServices masterServices) { + super.setMasterServices(masterServices); + this.services = masterServices; + this.regionFinder.setServices(masterServices); + } + + /** + * Given the cluster state this will try and approach an optimal balance. This + * should always approach the optimal state given enough steps. + */ + @Override + public List balanceCluster(Map> clusterState) { + + // No need to balance a one node cluster. + if (clusterState.size() <= 1) { + LOG.debug("Skipping load balance as cluster has only one node."); + return null; + } + + long startTime = System.currentTimeMillis(); + + // Keep track of servers to iterate through them. + List servers = new ArrayList(clusterState.keySet()); + Map initialRegionMapping = createRegionMapping(clusterState); + double currentCost, newCost, initCost; + currentCost = newCost = initCost = computeCost(initialRegionMapping, clusterState); + + int computedMaxSteps = + Math.min(this.maxSteps, (initialRegionMapping.size() * this.stepsPerRegion)); + // Perform a stochastic walk to see if we can get a good fit. + for (int step = 0; step < computedMaxSteps; step++) { + + // try and perform a mutation + for (ServerName leftServer : servers) { + + // What server are we going to be swapping regions with ? + ServerName rightServer = pickOtherServer(leftServer, servers); + if (rightServer == null) { + continue; + } + + // Get the regions. + List leftRegionList = clusterState.get(leftServer); + List rightRegionList = clusterState.get(rightServer); + + // Pick what regions to swap around. + // If we get a null for one then this isn't a swap just a move + HRegionInfo lRegion = pickRandomRegion(leftRegionList, 0); + HRegionInfo rRegion = pickRandomRegion(rightRegionList, 0.5); + + // We randomly picked to do nothing. + if (lRegion == null && rRegion == null) { + continue; + } + + if (rRegion != null) { + leftRegionList.add(rRegion); + } + + if (lRegion != null) { + rightRegionList.add(lRegion); + } + + newCost = computeCost(initialRegionMapping, clusterState); + + // Should this be kept? + if (newCost < currentCost) { + currentCost = newCost; + } else { + // Put things back the way they were before. + if (rRegion != null) { + leftRegionList.remove(rRegion); + rightRegionList.add(rRegion); + } + + if (lRegion != null) { + rightRegionList.remove(lRegion); + leftRegionList.add(lRegion); + } + } + } + + } + + long endTime = System.currentTimeMillis(); + + if (initCost > currentCost) { + List plans = createRegionPlans(initialRegionMapping, clusterState); + + LOG.debug("Finished computing new laod balance plan. Computation took " + + (endTime - startTime) + "ms to try " + computedMaxSteps + + " different iterations. Found a solution that moves " + plans.size() + + " regions; Going from a computed cost of " + initCost + " to a new cost of " + + currentCost); + return plans; + } + LOG.debug("Could not find a better load balance plan. Tried " + computedMaxSteps + + " different configurations in " + (endTime - startTime) + + "ms, and did not find anything with a computed cost less than " + initCost); + return null; + } + + /** + * Create all of the RegionPlan's needed to move from the initial cluster state to the desired + * state. + * + * @param initialRegionMapping Initial mapping of Region to Server + * @param clusterState The desired mapping of ServerName to Regions + * @return List of RegionPlan's that represent the moves needed to get to desired final state. + */ + private List createRegionPlans(Map initialRegionMapping, + Map> clusterState) { + List plans = new LinkedList(); + + for (Entry> entry : clusterState.entrySet()) { + ServerName newServer = entry.getKey(); + + for (HRegionInfo region : entry.getValue()) { + ServerName initialServer = initialRegionMapping.get(region); + if (!newServer.equals(initialServer)) { + LOG.trace("Moving Region " + region.getEncodedName() + " from server " + + initialServer.getHostname() + " to " + newServer.getHostname()); + RegionPlan rp = new RegionPlan(region, initialServer, newServer); + plans.add(rp); + } + } + } + return plans; + } + + /** + * Create a map that will represent the initial location of regions on a + * {@link ServerName} + * + * @param clusterState starting state of the cluster and regions. + * @return A map of {@link HRegionInfo} to the {@link ServerName} that is + * currently hosting that region + */ + private Map createRegionMapping( + Map> clusterState) { + Map mapping = new HashMap(); + + for (Entry> entry : clusterState.entrySet()) { + for (HRegionInfo region : entry.getValue()) { + mapping.put(region, entry.getKey()); + } + } + return mapping; + } + + /** Store the current region loads. */ + private void updateRegionLoad() { + loads.clear(); + for (ServerName sn : clusterStatus.getServers()) { + HServerLoad hsl = clusterStatus.getLoad(sn); + if (hsl == null) continue; + for (Entry entry : hsl.getRegionsLoad().entrySet()) { + loads.put(Bytes.toString(entry.getKey()), entry.getValue()); + + } + } + } + + /** + * From a list of regions pick a random one. Null can be returned which + * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move + * rather than swap. + * + * @param regions list of regions. + * @param chanceOfNoSwap Chance that this will decide to try a move rather + * than a swap. + * @return a random {@link HRegionInfo} or null if an asymmetrical move is + * suggested. + */ + private HRegionInfo pickRandomRegion(List regions, double chanceOfNoSwap) { + + //Check to see if this is just a move. + if (regions.isEmpty() || RANDOM.nextFloat() < chanceOfNoSwap) { + //signal a move only. + return null; + } + + int count = 0; + HRegionInfo r = null; + + //We will try and find a region up to 10 times. If we always + while (count < 10 && r == null ) { + count++; + r = regions.get(RANDOM.nextInt(regions.size())); + + // If this is a special region we always try not to move it. + // so clear out r. try again + if (r.isMetaRegion() || r.isRootRegion() ) { + r = null; + } + } + if (r != null) { + regions.remove(r); + } + return r; + } + + /** + * Given a server we will want to switch regions with another server. This + * function picks a random server from the list. + * + * @param server Current Server. This server will never be the return value. + * @param allServers list of all server from which to pick + * @return random server. Null if no other servers were found. + */ + private ServerName pickOtherServer(ServerName server, List allServers) { + ServerName s = null; + int count = 0; + while (count < 100 && (s == null || s.equals(server))) { + count++; + s = allServers.get(RANDOM.nextInt(allServers.size())); + } + + // If nothing but the current server was found return null. + return (s == null || s.equals(server)) ? null : s; + } + + /** + * This is the main cost function. It will compute a cost associated with a proposed cluster + * state. All different costs will be combined with their multipliers to produce a double cost. + * + * @param initialRegionMapping Map of where the regions started. + * @param clusterState Map of ServerName to list of regions. + * @return a double of a cost associated with the proposed + */ + protected double computeCost(Map initialRegionMapping, + Map> clusterState) { + + double moveCost = moveCostMultiplier * computeMoveCost(initialRegionMapping, clusterState); + + double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(clusterState); + double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(clusterState); + double localityCost = + localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState); + + // TODO: Add Read and Write requests back in here after keeping a running average on per + // region load metrics. + double memstoreSizeCost = + memStoreSizeMultiplier + * computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE); + double storefileSizeCost = + storeFileSizeMultiplier + * computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE); + double total = + moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost + + storefileSizeCost; + LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = " + + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = " + + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = " + + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost); + return total; + } + + /** + * Given the starting state of the regions and a potential ending state + * compute cost based upon the number of regions that have moved. + * + * @param initialRegionMapping The starting location of regions. + * @param clusterState The potential new cluster state. + * @return The cost. Between 0 and 1. + */ + double computeMoveCost(Map initialRegionMapping, + Map> clusterState) { + float moveCost = 0; + for (Entry> entry : clusterState.entrySet()) { + for (HRegionInfo region : entry.getValue()) { + if (initialRegionMapping.get(region) != entry.getKey()) { + moveCost += 1; + } + } + } + + //Don't let this single balance move more than the max moves. + //This allows better scaling to accurately represent the actual cost of a move. + if (moveCost > maxMoves) { + return 10000; //return a number much greater than any of the other cost functions + } + + return scale(0, Math.min(maxMoves, initialRegionMapping.size()), moveCost); + } + + /** + * Compute the cost of a potential cluster state from skew in number of + * regions on a cluster + * + * @param clusterState The proposed cluster state + * @return The cost of region load imbalance. + */ + double computeSkewLoadCost(Map> clusterState) { + DescriptiveStatistics stats = new DescriptiveStatistics(); + for (List regions : clusterState.values()) { + int size = regions.size(); + stats.addValue(size); + } + return costFromStats(stats); + } + + /** + * Compute the cost of a potential cluster configuration based upon how evenly + * distributed tables are. + * + * @param clusterState Proposed cluster state. + * @return Cost of imbalance in table. + */ + double computeTableSkewLoadCost(Map> clusterState) { + + Map tableRegionsTotal = new HashMap(); + Map tableRegionsOnCurrentServer = new HashMap(); + Map tableCostSeenSoFar = new HashMap(); + // Go through everything per server + for (Entry> entry : clusterState.entrySet()) { + tableRegionsOnCurrentServer.clear(); + + // For all of the regions count how many are from each table + for (HRegionInfo region : entry.getValue()) { + String tableName = region.getTableNameAsString(); + + // See if this table already has a count on this server + MutableInt regionsOnServerCount = tableRegionsOnCurrentServer.get(tableName); + + // If this is the first time we've seen this table on this server + // create a new mutable int. + if (regionsOnServerCount == null) { + regionsOnServerCount = new MutableInt(0); + tableRegionsOnCurrentServer.put(tableName, regionsOnServerCount); + } + + // Increment the count of how many regions from this table are host on + // this server + regionsOnServerCount.increment(); + + // Now count the number of regions in this table. + MutableInt totalCount = tableRegionsTotal.get(tableName); + + // If this is the first region from this table create a new counter for + // this table. + if (totalCount == null) { + totalCount = new MutableInt(0); + tableRegionsTotal.put(tableName, totalCount); + } + totalCount.increment(); + } + + // Now go through all of the tables we have seen and keep the max number + // of regions of this table a single region server is hosting. + for (String tableName : tableRegionsOnCurrentServer.keySet()) { + Integer thisCount = tableRegionsOnCurrentServer.get(tableName).toInteger(); + Integer maxCountSoFar = tableCostSeenSoFar.get(tableName); + + if (maxCountSoFar == null || thisCount.compareTo(maxCountSoFar) > 0) { + tableCostSeenSoFar.put(tableName, thisCount); + } + + } + } + + double max = 0; + double min = 0; + double value = 0; + + // Compute the min, value, and max. + for (String tableName : tableRegionsTotal.keySet()) { + max += tableRegionsTotal.get(tableName).doubleValue(); + min += tableRegionsTotal.get(tableName).doubleValue() / (double) clusterState.size(); + value += tableCostSeenSoFar.get(tableName).doubleValue(); + + } + return scale(min, max, value); + } + + /** + * Compute a cost of a potential cluster configuration based upon where + * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. + * + * @param clusterState The state of the cluster + * @return A cost between 0 and 1. 0 Means all regions are on the sever with + * the most local store files. + */ + double computeDataLocalityCost(Map initialRegionMapping, + Map> clusterState) { + + double max = 0; + double cost = 0; + + // If there's no master so there's no way anything else works. + if (this.services == null) return cost; + + for (Entry> entry : clusterState.entrySet()) { + ServerName sn = entry.getKey(); + for (HRegionInfo region : entry.getValue()) { + + max += 1; + + // Only compute the data locality for moved regions. + if (initialRegionMapping.equals(sn)) { + continue; + } + + List dataOnServers = regionFinder.getTopBlockLocations(region); + + // If we can't find where the data is getTopBlock returns null. + // so count that as being the best possible. + if (dataOnServers == null) { + continue; + } + + int index = dataOnServers.indexOf(sn); + if (index < 0) { + cost += 1; + } else { + cost += (double) index / (double) dataOnServers.size(); + } + + } + } + return scale(0, max, cost); + } + + /** The cost's that can be derived from RegionLoad */ + private enum RegionLoadCostType { + READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE + } + + /** + * Compute the cost of the current cluster state due to some RegionLoadCost type + * + * @param clusterState the cluster + * @param costType what type of cost to consider + * @return the scaled cost. + */ + private double computeRegionLoadCost(Map> clusterState, + RegionLoadCostType costType) { + + if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0; + + DescriptiveStatistics stats = new DescriptiveStatistics(); + + // For every server look at the cost of each region + for (List regions : clusterState.values()) { + long cost = 0; //Cost this server has from RegionLoad + + // For each region + for (HRegionInfo region : regions) { + // Try and get the region using the regionNameAsString + RegionLoad rl = loads.get(region.getRegionNameAsString()); + + // That could have failed if the RegionLoad is using the other regionName + if (rl == null) { + // Try getting the region load using encoded name. + rl = loads.get(region.getEncodedName()); + } + // Now if we found a region load get the type of cost that was requested. + if (rl != null) { + cost += getRegionLoadCost(rl, costType); + } + } + + // Add the total cost to the stats. + stats.addValue(cost); + } + + // No return the scaled cost from data held in the stats object. + return costFromStats(stats); + } + + /** + * Get the un-scaled cost from a RegionLoad + * + * @param rl the Region load + * @param type The type of cost to extract + * @return the double representing the cost + */ + private double getRegionLoadCost(RegionLoad rl, RegionLoadCostType type) { + switch (type) { + case READ_REQUEST: + return rl.getReadRequestsCount(); + case WRITE_REQUEST: + return rl.getWriteRequestsCount(); + case MEMSTORE_SIZE: + return rl.getMemStoreSizeMB(); + case STOREFILE_SIZE: + return rl.getStorefileSizeMB(); + default: + assert false : "RegionLoad cost type not supported."; + return 0; + } + } + + /** + * Function to compute a scaled cost using {@link DescriptiveStatistics}. It + * assumes that this is a zero sum set of costs. It assumes that the worst case + * possible is all of the elements in one region server and the rest having 0. + * + * @param stats the costs + * @return a scaled set of costs. + */ + double costFromStats(DescriptiveStatistics stats) { + double totalCost = 0; + double mean = stats.getMean(); + + //Compute max as if all region servers had 0 and one had the sum of all costs. This must be + // a zero sum cost for this to make sense. + double max = ((stats.getN() - 1) * stats.getMean()) + (stats.getSum() - stats.getMean()); + for (double n : stats.getValues()) { + totalCost += Math.abs(mean - n); + + } + + return scale(0, max, totalCost); + } + + /** + * Scale the value between 0 and 1. + * + * @param min Min value + * @param max The Max value + * @param value The value to be scaled. + * @return The scaled value. + */ + private double scale(double min, double max, double value) { + if (max == 0 || value == 0) { + return 0; + } + + return Math.max(0d, Math.min(1d, (value - min) / max)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java index aa74cd5e24c..63dc8dc224b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; @@ -37,35 +38,52 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** - * Test whether region rebalancing works. (HBASE-71) + * Test whether region re-balancing works. (HBASE-71) */ @Category(LargeTests.class) +@RunWith(value = Parameterized.class) public class TestRegionRebalancing { - final Log LOG = LogFactory.getLog(this.getClass().getName()); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - HTable table; - HTableDescriptor desc; - private static final byte [] FAMILY_NAME = Bytes.toBytes("col"); - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(1); + @Parameters + public static Collection data() { + Object[][] balancers = + new String[][] { { "org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer" }, + { "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer" } }; + return Arrays.asList(balancers); } - @AfterClass - public static void afterClass() throws Exception { + private static final byte[] FAMILY_NAME = Bytes.toBytes("col"); + public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class); + private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private HTable table; + private HTableDescriptor desc; + private String balancerName; + + public TestRegionRebalancing(String balancerName) { + this.balancerName = balancerName; + + } + + @After + public void after() throws Exception { UTIL.shutdownMiniCluster(); } @Before - public void before() { + public void before() throws Exception { + UTIL.getConfiguration().set("hbase.master.loadbalancer.class", this.balancerName); + UTIL.startMiniCluster(1); this.desc = new HTableDescriptor("test"); this.desc.addFamily(new HColumnDescriptor(FAMILY_NAME)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 7a6cc0d7b16..25ea00cccb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -34,12 +34,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; -import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -52,13 +50,15 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; +import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; +import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java deleted file mode 100644 index c9608cb9a85..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java +++ /dev/null @@ -1,513 +0,0 @@ -/** - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - - -/** - * Test the load balancer that is created by default. - */ -@Category(SmallTests.class) -public class TestDefaultLoadBalancer { - private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class); - - private static LoadBalancer loadBalancer; - - private static Random rand; - - @BeforeClass - public static void beforeAllTests() throws Exception { - Configuration conf = HBaseConfiguration.create(); - conf.set("hbase.regions.slop", "0"); - loadBalancer = new DefaultLoadBalancer(); - loadBalancer.setConf(conf); - rand = new Random(); - } - - // int[testnum][servernumber] -> numregions - int [][] clusterStateMocks = new int [][] { - // 1 node - new int [] { 0 }, - new int [] { 1 }, - new int [] { 10 }, - // 2 node - new int [] { 0, 0 }, - new int [] { 2, 0 }, - new int [] { 2, 1 }, - new int [] { 2, 2 }, - new int [] { 2, 3 }, - new int [] { 2, 4 }, - new int [] { 1, 1 }, - new int [] { 0, 1 }, - new int [] { 10, 1 }, - new int [] { 14, 1432 }, - new int [] { 47, 53 }, - // 3 node - new int [] { 0, 1, 2 }, - new int [] { 1, 2, 3 }, - new int [] { 0, 2, 2 }, - new int [] { 0, 3, 0 }, - new int [] { 0, 4, 0 }, - new int [] { 20, 20, 0 }, - // 4 node - new int [] { 0, 1, 2, 3 }, - new int [] { 4, 0, 0, 0 }, - new int [] { 5, 0, 0, 0 }, - new int [] { 6, 6, 0, 0 }, - new int [] { 6, 2, 0, 0 }, - new int [] { 6, 1, 0, 0 }, - new int [] { 6, 0, 0, 0 }, - new int [] { 4, 4, 4, 7 }, - new int [] { 4, 4, 4, 8 }, - new int [] { 0, 0, 0, 7 }, - // 5 node - new int [] { 1, 1, 1, 1, 4 }, - // more nodes - new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, - new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 }, - new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 }, - new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }, - new int [] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } - }; - - int [][] regionsAndServersMocks = new int [][] { - // { num regions, num servers } - new int [] { 0, 0 }, - new int [] { 0, 1 }, - new int [] { 1, 1 }, - new int [] { 2, 1 }, - new int [] { 10, 1 }, - new int [] { 1, 2 }, - new int [] { 2, 2 }, - new int [] { 3, 2 }, - new int [] { 1, 3 }, - new int [] { 2, 3 }, - new int [] { 3, 3 }, - new int [] { 25, 3 }, - new int [] { 2, 10 }, - new int [] { 2, 100 }, - new int [] { 12, 10 }, - new int [] { 12, 100 }, - }; - - /** - * Test the load balancing algorithm. - * - * Invariant is that all servers should be hosting either - * floor(average) or ceiling(average) - * - * @throws Exception - */ - @Test - public void testBalanceCluster() throws Exception { - - for(int [] mockCluster : clusterStateMocks) { - Map> servers = mockClusterServers(mockCluster); - List list = convertToList(servers); - LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); - List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans); - LOG.info("Mock Balance : " + printMock(balancedCluster)); - assertClusterAsBalanced(balancedCluster); - for(Map.Entry> entry : servers.entrySet()) { - returnRegions(entry.getValue()); - returnServer(entry.getKey()); - } - } - - } - - /** - * Invariant is that all servers have between floor(avg) and ceiling(avg) - * number of regions. - */ - public void assertClusterAsBalanced(List servers) { - int numServers = servers.size(); - int numRegions = 0; - int maxRegions = 0; - int minRegions = Integer.MAX_VALUE; - for(ServerAndLoad server : servers) { - int nr = server.getLoad(); - if(nr > maxRegions) { - maxRegions = nr; - } - if(nr < minRegions) { - minRegions = nr; - } - numRegions += nr; - } - if(maxRegions - minRegions < 2) { - // less than 2 between max and min, can't balance - return; - } - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - for(ServerAndLoad server : servers) { - assertTrue(server.getLoad() <= max); - assertTrue(server.getLoad() >= min); - } - } - - /** - * Tests immediate assignment. - * - * Invariant is that all regions have an assignment. - * - * @throws Exception - */ - @Test - public void testImmediateAssignment() throws Exception { - for(int [] mock : regionsAndServersMocks) { - LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); - List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); - List list = getListOfServerNames(servers); - Map assignments = - loadBalancer.immediateAssignment(regions, list); - assertImmediateAssignment(regions, list, assignments); - returnRegions(regions); - returnServers(list); - } - } - - /** - * All regions have an assignment. - * @param regions - * @param servers - * @param assignments - */ - private void assertImmediateAssignment(List regions, - List servers, Map assignments) { - for(HRegionInfo region : regions) { - assertTrue(assignments.containsKey(region)); - } - } - - /** - * Tests the bulk assignment used during cluster startup. - * - * Round-robin. Should yield a balanced cluster so same invariant as the load - * balancer holds, all servers holding either floor(avg) or ceiling(avg). - * - * @throws Exception - */ - @Test - public void testBulkAssignment() throws Exception { - for(int [] mock : regionsAndServersMocks) { - LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); - List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); - List list = getListOfServerNames(servers); - Map> assignments = - loadBalancer.roundRobinAssignment(regions, list); - float average = (float)regions.size()/servers.size(); - int min = (int)Math.floor(average); - int max = (int)Math.ceil(average); - if(assignments != null && !assignments.isEmpty()) { - for(List regionList : assignments.values()) { - assertTrue(regionList.size() == min || regionList.size() == max); - } - } - returnRegions(regions); - returnServers(list); - } - } - - /** - * Test the cluster startup bulk assignment which attempts to retain - * assignment info. - * @throws Exception - */ - @Test - public void testRetainAssignment() throws Exception { - // Test simple case where all same servers are there - List servers = randomServers(10, 10); - List regions = randomRegions(100); - Map existing = - new TreeMap(); - for (int i = 0; i < regions.size(); i++) { - ServerName sn = servers.get(i % servers.size()).getServerName(); - // The old server would have had same host and port, but different - // start code! - ServerName snWithOldStartCode = - new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() - 10); - existing.put(regions.get(i), snWithOldStartCode); - } - List listOfServerNames = getListOfServerNames(servers); - Map> assignment = - loadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - - // Include two new servers that were not there before - List servers2 = - new ArrayList(servers); - servers2.add(randomServer(10)); - servers2.add(randomServer(10)); - listOfServerNames = getListOfServerNames(servers2); - assignment = loadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - - // Remove two of the servers that were previously there - List servers3 = - new ArrayList(servers); - servers3.remove(0); - servers3.remove(0); - listOfServerNames = getListOfServerNames(servers3); - assignment = loadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - } - - private List getListOfServerNames(final List sals) { - List list = new ArrayList(); - for (ServerAndLoad e: sals) { - list.add(e.getServerName()); - } - return list; - } - - /** - * Asserts a valid retained assignment plan. - *

- * Must meet the following conditions: - *

    - *
  • Every input region has an assignment, and to an online server - *
  • If a region had an existing assignment to a server with the same - * address a a currently online server, it will be assigned to it - *
- * @param existing - * @param servers - * @param assignment - */ - private void assertRetainedAssignment( - Map existing, List servers, - Map> assignment) { - // Verify condition 1, every region assigned, and to online server - Set onlineServerSet = new TreeSet(servers); - Set assignedRegions = new TreeSet(); - for (Map.Entry> a : assignment.entrySet()) { - assertTrue("Region assigned to server that was not listed as online", - onlineServerSet.contains(a.getKey())); - for (HRegionInfo r : a.getValue()) assignedRegions.add(r); - } - assertEquals(existing.size(), assignedRegions.size()); - - // Verify condition 2, if server had existing assignment, must have same - Set onlineHostNames = new TreeSet(); - for (ServerName s : servers) { - onlineHostNames.add(s.getHostname()); - } - - for (Map.Entry> a : assignment.entrySet()) { - ServerName assignedTo = a.getKey(); - for (HRegionInfo r : a.getValue()) { - ServerName address = existing.get(r); - if (address != null && onlineHostNames.contains(address.getHostname())) { - // this region was prevously assigned somewhere, and that - // host is still around, then it should be re-assigned on the - // same host - assertEquals(address.getHostname(), assignedTo.getHostname()); - } - } - } - } - - private String printStats(List servers) { - int numServers = servers.size(); - int totalRegions = 0; - for(ServerAndLoad server : servers) { - totalRegions += server.getLoad(); - } - float average = (float)totalRegions / numServers; - int max = (int)Math.ceil(average); - int min = (int)Math.floor(average); - return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]"; - } - - private List convertToList(final Map> servers) { - List list = - new ArrayList(servers.size()); - for (Map.Entry> e: servers.entrySet()) { - list.add(new ServerAndLoad(e.getKey(), e.getValue().size())); - } - return list; - } - - private String printMock(List balancedCluster) { - SortedSet sorted = - new TreeSet(balancedCluster); - ServerAndLoad [] arr = - sorted.toArray(new ServerAndLoad[sorted.size()]); - StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); - sb.append("{ "); - for(int i = 0; i < arr.length; i++) { - if (i != 0) { - sb.append(" , "); - } - sb.append(arr[i].getLoad()); - } - sb.append(" }"); - return sb.toString(); - } - - /** - * This assumes the RegionPlan HSI instances are the same ones in the map, so - * actually no need to even pass in the map, but I think it's clearer. - * @param list - * @param plans - * @return - */ - private List reconcile(List list, - List plans) { - List result = - new ArrayList(list.size()); - if (plans == null) return result; - Map map = - new HashMap(list.size()); - for (RegionPlan plan : plans) { - ServerName source = plan.getSource(); - updateLoad(map, source, -1); - ServerName destination = plan.getDestination(); - updateLoad(map, destination, +1); - } - result.clear(); - result.addAll(map.values()); - return result; - } - - private void updateLoad(Map map, - final ServerName sn, final int diff) { - ServerAndLoad sal = map.get(sn); - if (sal == null) return; - sal = new ServerAndLoad(sn, sal.getLoad() + diff); - map.put(sn, sal); - } - - private Map> mockClusterServers( - int [] mockCluster) { - int numServers = mockCluster.length; - Map> servers = - new TreeMap>(); - for(int i = 0; i < numServers; i++) { - int numRegions = mockCluster[i]; - ServerAndLoad sal = randomServer(0); - List regions = randomRegions(numRegions); - servers.put(sal.getServerName(), regions); - } - return servers; - } - - private Queue regionQueue = new LinkedList(); - static int regionId = 0; - - private List randomRegions(int numRegions) { - List regions = new ArrayList(numRegions); - byte [] start = new byte[16]; - byte [] end = new byte[16]; - rand.nextBytes(start); - rand.nextBytes(end); - for(int i=0;i regions) { - regionQueue.addAll(regions); - } - - private Queue serverQueue = new LinkedList(); - - private ServerAndLoad randomServer(final int numRegionsPerServer) { - if (!this.serverQueue.isEmpty()) { - ServerName sn = this.serverQueue.poll(); - return new ServerAndLoad(sn, numRegionsPerServer); - } - String host = "server" + rand.nextInt(100000); - int port = rand.nextInt(60000); - long startCode = rand.nextLong(); - ServerName sn = new ServerName(host, port, startCode); - return new ServerAndLoad(sn, numRegionsPerServer); - } - - private List randomServers(int numServers, int numRegionsPerServer) { - List servers = - new ArrayList(numServers); - for (int i = 0; i < numServers; i++) { - servers.add(randomServer(numRegionsPerServer)); - } - return servers; - } - - private void returnServer(ServerName server) { - serverQueue.add(server); - } - - private void returnServers(List servers) { - this.serverQueue.addAll(servers); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java new file mode 100644 index 00000000000..b726db37633 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Class used to be the base of unit tests on load balancers. It gives helper + * methods to create maps of {@link ServerName} to lists of {@link HRegionInfo} + * and to check list of region plans. + * + */ +public class BalancerTestBase { + + private static Random rand = new Random(); + static int regionId = 0; + + /** + * Invariant is that all servers have between floor(avg) and ceiling(avg) + * number of regions. + */ + public void assertClusterAsBalanced(List servers) { + int numServers = servers.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for (ServerAndLoad server : servers) { + int nr = server.getLoad(); + if (nr > maxRegions) { + maxRegions = nr; + } + if (nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if (maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for (ServerAndLoad server : servers) { + assertTrue(server.getLoad() >= 0); + assertTrue(server.getLoad() <= max); + assertTrue(server.getLoad() >= min); + } + } + + protected String printStats(List servers) { + int numServers = servers.size(); + int totalRegions = 0; + for (ServerAndLoad server : servers) { + totalRegions += server.getLoad(); + } + float average = (float) totalRegions / numServers; + int max = (int) Math.ceil(average); + int min = (int) Math.floor(average); + return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + + " min=" + min + "]"; + } + + protected List convertToList(final Map> servers) { + List list = new ArrayList(servers.size()); + for (Map.Entry> e : servers.entrySet()) { + list.add(new ServerAndLoad(e.getKey(), e.getValue().size())); + } + return list; + } + + protected String printMock(List balancedCluster) { + SortedSet sorted = new TreeSet(balancedCluster); + ServerAndLoad[] arr = sorted.toArray(new ServerAndLoad[sorted.size()]); + StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); + sb.append("{ "); + for (int i = 0; i < arr.length; i++) { + if (i != 0) { + sb.append(" , "); + } + sb.append(arr[i].getServerName().getHostname()); + sb.append(":"); + sb.append(arr[i].getLoad()); + } + sb.append(" }"); + return sb.toString(); + } + + /** + * This assumes the RegionPlan HSI instances are the same ones in the map, so + * actually no need to even pass in the map, but I think it's clearer. + * + * @param list + * @param plans + * @return + */ + protected List reconcile(List list, List plans) { + List result = new ArrayList(list.size()); + if (plans == null) return result; + Map map = new HashMap(list.size()); + for (ServerAndLoad sl : list) { + map.put(sl.getServerName(), sl); + } + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(map, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(map, destination, +1); + } + result.clear(); + result.addAll(map.values()); + return result; + } + + protected void updateLoad(final Map map, + final ServerName sn, + final int diff) { + ServerAndLoad sal = map.get(sn); + if (sal == null) sal = new ServerAndLoad(sn, 0); + sal = new ServerAndLoad(sn, sal.getLoad() + diff); + map.put(sn, sal); + } + + protected Map> mockClusterServers(int[] mockCluster) { + int numServers = mockCluster.length; + Map> servers = new TreeMap>(); + for (int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = randomRegions(numRegions); + servers.put(sal.getServerName(), regions); + } + return servers; + } + + private Queue regionQueue = new LinkedList(); + + protected List randomRegions(int numRegions) { + List regions = new ArrayList(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + for (int i = 0; i < numRegions; i++) { + if (!regionQueue.isEmpty()) { + regions.add(regionQueue.poll()); + continue; + } + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table" + i), start, end, false, regionId++); + regions.add(hri); + } + return regions; + } + + protected void returnRegions(List regions) { + regionQueue.addAll(regions); + } + + private Queue serverQueue = new LinkedList(); + + protected ServerAndLoad randomServer(final int numRegionsPerServer) { + if (!this.serverQueue.isEmpty()) { + ServerName sn = this.serverQueue.poll(); + return new ServerAndLoad(sn, numRegionsPerServer); + } + String host = "srv" + rand.nextInt(100000); + int port = rand.nextInt(60000); + long startCode = rand.nextLong(); + ServerName sn = new ServerName(host, port, startCode); + return new ServerAndLoad(sn, numRegionsPerServer); + } + + protected List randomServers(int numServers, int numRegionsPerServer) { + List servers = new ArrayList(numServers); + for (int i = 0; i < numServers; i++) { + servers.add(randomServer(numRegionsPerServer)); + } + return servers; + } + + protected void returnServer(ServerName server) { + serverQueue.add(server); + } + + protected void returnServers(List servers) { + this.serverQueue.addAll(servers); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java new file mode 100644 index 00000000000..0264f56ade0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestBaseLoadBalancer extends BalancerTestBase { + + private static LoadBalancer loadBalancer; + private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + + int[][] regionsAndServersMocks = new int[][] { + // { num regions, num servers } + new int[] { 0, 0 }, new int[] { 0, 1 }, new int[] { 1, 1 }, new int[] { 2, 1 }, + new int[] { 10, 1 }, new int[] { 1, 2 }, new int[] { 2, 2 }, new int[] { 3, 2 }, + new int[] { 1, 3 }, new int[] { 2, 3 }, new int[] { 3, 3 }, new int[] { 25, 3 }, + new int[] { 2, 10 }, new int[] { 2, 100 }, new int[] { 12, 10 }, new int[] { 12, 100 }, }; + + @BeforeClass + public static void beforeAllTests() throws Exception { + Configuration conf = HBaseConfiguration.create(); + loadBalancer = new MockBalancer(); + loadBalancer.setConf(conf); + } + + public static class MockBalancer extends BaseLoadBalancer { + + @Override + public List balanceCluster(Map> clusterState) { + return null; + } + + } + + /** + * Tests immediate assignment. + * + * Invariant is that all regions have an assignment. + * + * @throws Exception + */ + @Test + public void testImmediateAssignment() throws Exception { + for (int[] mock : regionsAndServersMocks) { + LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + List list = getListOfServerNames(servers); + Map assignments = loadBalancer.immediateAssignment(regions, list); + assertImmediateAssignment(regions, list, assignments); + returnRegions(regions); + returnServers(list); + } + } + + /** + * All regions have an assignment. + * @param regions + * @param servers + * @param assignments + */ + private void assertImmediateAssignment(List regions, List servers, + Map assignments) { + for (HRegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + } + } + + /** + * Tests the bulk assignment used during cluster startup. + * + * Round-robin. Should yield a balanced cluster so same invariant as the load + * balancer holds, all servers holding either floor(avg) or ceiling(avg). + * + * @throws Exception + */ + @Test + public void testBulkAssignment() throws Exception { + for (int[] mock : regionsAndServersMocks) { + LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + List list = getListOfServerNames(servers); + Map> assignments = + loadBalancer.roundRobinAssignment(regions, list); + float average = (float) regions.size() / servers.size(); + int min = (int) Math.floor(average); + int max = (int) Math.ceil(average); + if (assignments != null && !assignments.isEmpty()) { + for (List regionList : assignments.values()) { + assertTrue(regionList.size() == min || regionList.size() == max); + } + } + returnRegions(regions); + returnServers(list); + } + } + + /** + * Test the cluster startup bulk assignment which attempts to retain + * assignment info. + * @throws Exception + */ + @Test + public void testRetainAssignment() throws Exception { + // Test simple case where all same servers are there + List servers = randomServers(10, 10); + List regions = randomRegions(100); + Map existing = new TreeMap(); + for (int i = 0; i < regions.size(); i++) { + ServerName sn = servers.get(i % servers.size()).getServerName(); + // The old server would have had same host and port, but different + // start code! + ServerName snWithOldStartCode = + new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() - 10); + existing.put(regions.get(i), snWithOldStartCode); + } + List listOfServerNames = getListOfServerNames(servers); + Map> assignment = + loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + + // Include two new servers that were not there before + List servers2 = new ArrayList(servers); + servers2.add(randomServer(10)); + servers2.add(randomServer(10)); + listOfServerNames = getListOfServerNames(servers2); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + + // Remove two of the servers that were previously there + List servers3 = new ArrayList(servers); + servers3.remove(0); + servers3.remove(0); + listOfServerNames = getListOfServerNames(servers3); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + } + + private List getListOfServerNames(final List sals) { + List list = new ArrayList(); + for (ServerAndLoad e : sals) { + list.add(e.getServerName()); + } + return list; + } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ * @param existing + * @param servers + * @param assignment + */ + private void assertRetainedAssignment(Map existing, + List servers, Map> assignment) { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet(servers); + Set assignedRegions = new TreeSet(); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue("Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (HRegionInfo r : a.getValue()) + assignedRegions.add(r); + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, if server had existing assignment, must have same + Set onlineHostNames = new TreeSet(); + for (ServerName s : servers) { + onlineHostNames.add(s.getHostname()); + } + + for (Map.Entry> a : assignment.entrySet()) { + ServerName assignedTo = a.getKey(); + for (HRegionInfo r : a.getValue()) { + ServerName address = existing.get(r); + if (address != null && onlineHostNames.contains(address.getHostname())) { + // this region was prevously assigned somewhere, and that + // host is still around, then it should be re-assigned on the + // same host + assertEquals(address.getHostname(), assignedTo.getHostname()); + } + } + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java new file mode 100644 index 00000000000..56e1001bac1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the load balancer that is created by default. + */ +@Category(MediumTests.class) +public class TestDefaultLoadBalancer extends BalancerTestBase { + private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class); + + private static LoadBalancer loadBalancer; + + @BeforeClass + public static void beforeAllTests() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.regions.slop", "0"); + loadBalancer = new DefaultLoadBalancer(); + loadBalancer.setConf(conf); + } + + // int[testnum][servernumber] -> numregions + int[][] clusterStateMocks = new int[][] { + // 1 node + new int[] { 0 }, + new int[] { 1 }, + new int[] { 10 }, + // 2 node + new int[] { 0, 0 }, + new int[] { 2, 0 }, + new int[] { 2, 1 }, + new int[] { 2, 2 }, + new int[] { 2, 3 }, + new int[] { 2, 4 }, + new int[] { 1, 1 }, + new int[] { 0, 1 }, + new int[] { 10, 1 }, + new int[] { 14, 1432 }, + new int[] { 47, 53 }, + // 3 node + new int[] { 0, 1, 2 }, + new int[] { 1, 2, 3 }, + new int[] { 0, 2, 2 }, + new int[] { 0, 3, 0 }, + new int[] { 0, 4, 0 }, + new int[] { 20, 20, 0 }, + // 4 node + new int[] { 0, 1, 2, 3 }, + new int[] { 4, 0, 0, 0 }, + new int[] { 5, 0, 0, 0 }, + new int[] { 6, 6, 0, 0 }, + new int[] { 6, 2, 0, 0 }, + new int[] { 6, 1, 0, 0 }, + new int[] { 6, 0, 0, 0 }, + new int[] { 4, 4, 4, 7 }, + new int[] { 4, 4, 4, 8 }, + new int[] { 0, 0, 0, 7 }, + // 5 node + new int[] { 1, 1, 1, 1, 4 }, + // more nodes + new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, + new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, new int[] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 }, + new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 }, + new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 }, + new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 }, + new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 }, + new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 }, + new int[] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 }, + new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }, + new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } }; + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers should be hosting either floor(average) or + * ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + + for (int[] mockCluster : clusterStateMocks) { + Map> servers = mockClusterServers(mockCluster); + List list = convertToList(servers); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + List plans = loadBalancer.balanceCluster(servers); + List balancedCluster = reconcile(list, plans); + LOG.info("Mock Balance : " + printMock(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + for (Map.Entry> entry : servers.entrySet()) { + returnRegions(entry.getValue()); + returnServer(entry.getKey()); + } + } + + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java new file mode 100644 index 00000000000..cfb434a31fc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestStochasticLoadBalancer extends BalancerTestBase { + private static StochasticLoadBalancer loadBalancer; + private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + + @BeforeClass + public static void beforeAllTests() throws Exception { + Configuration conf = HBaseConfiguration.create(); + loadBalancer = new StochasticLoadBalancer(); + loadBalancer.setConf(conf); + } + + // int[testnum][servernumber] -> numregions + int[][] clusterStateMocks = new int[][]{ + // 1 node + new int[]{0}, + new int[]{1}, + new int[]{10}, + // 2 node + new int[]{0, 0}, + new int[]{2, 0}, + new int[]{2, 1}, + new int[]{2, 2}, + new int[]{2, 3}, + new int[]{2, 4}, + new int[]{1, 1}, + new int[]{0, 1}, + new int[]{10, 1}, + new int[]{514, 1432}, + new int[]{47, 53}, + // 3 node + new int[]{0, 1, 2}, + new int[]{1, 2, 3}, + new int[]{0, 2, 2}, + new int[]{0, 3, 0}, + new int[]{0, 4, 0}, + new int[]{20, 20, 0}, + // 4 node + new int[]{0, 1, 2, 3}, + new int[]{4, 0, 0, 0}, + new int[]{5, 0, 0, 0}, + new int[]{6, 6, 0, 0}, + new int[]{6, 2, 0, 0}, + new int[]{6, 1, 0, 0}, + new int[]{6, 0, 0, 0}, + new int[]{4, 4, 4, 7}, + new int[]{4, 4, 4, 8}, + new int[]{0, 0, 0, 7}, + // 5 node + new int[]{1, 1, 1, 1, 4}, + // more nodes + new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 10}, + new int[]{6, 6, 5, 6, 6, 6, 6, 6, 6, 1}, + new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 54}, + new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 55}, + new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 56}, + new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 16}, + new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 8}, + new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 9}, + new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10}, + new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123}, + new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155}, + }; + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers should be hosting either floor(average) or + * ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + + for (int[] mockCluster : clusterStateMocks) { + Map> servers = mockClusterServers(mockCluster); + List list = convertToList(servers); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + List plans = loadBalancer.balanceCluster(servers); + List balancedCluster = reconcile(list, plans); + LOG.info("Mock Balance : " + printMock(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + for (Map.Entry> entry : servers.entrySet()) { + returnRegions(entry.getValue()); + returnServer(entry.getKey()); + } + } + + } + + @Test + public void testSkewCost() { + for (int[] mockCluster : clusterStateMocks) { + double cost = loadBalancer.computeSkewLoadCost(mockClusterServers(mockCluster)); + assertTrue(cost >= 0); + assertTrue(cost <= 1.01); + } + assertEquals(1, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 0, 1 })), 0.01); + assertEquals(.75, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 1, 1 })), 0.01); + assertEquals(.5, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 1, 1, 1 })), 0.01); + assertEquals(.25, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 1, 1, 1, 1 })), 0.01); + assertEquals(0, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 1, 1, 1, 1, 1 })), 0.01); + assertEquals(0, + loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 10, 10, 10, 10, 10 })), 0.01); + } + + @Test + public void testTableSkewCost() { + for (int[] mockCluster : clusterStateMocks) { + double cost = loadBalancer.computeTableSkewLoadCost(mockClusterServers(mockCluster)); + assertTrue(cost >= 0); + assertTrue(cost <= 1.01); + } + } + + @Test + public void testCostFromStats() { + DescriptiveStatistics statOne = new DescriptiveStatistics(); + for (int i =0; i < 100; i++) { + statOne.addValue(10); + } + assertEquals(0, loadBalancer.costFromStats(statOne), 0.01); + + DescriptiveStatistics statTwo = new DescriptiveStatistics(); + for (int i =0; i < 100; i++) { + statTwo.addValue(0); + } + statTwo.addValue(100); + assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01); + + DescriptiveStatistics statThree = new DescriptiveStatistics(); + for (int i =0; i < 100; i++) { + statThree.addValue(0); + statThree.addValue(100); + } + assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01); + } +} diff --git a/pom.xml b/pom.xml index fd535b6403c..9115887326c 100644 --- a/pom.xml +++ b/pom.xml @@ -794,6 +794,11 @@ commons-logging ${commons-logging.version}
+ + org.apache.commons + commons-math + ${commons-math.version} + log4j log4j @@ -1012,12 +1017,6 @@ ${mockito-all.version} test - - org.apache.commons - commons-math - ${commons-math.version} - test -