diff --git a/CHANGES.txt b/CHANGES.txt index 5df2cae932a..697604df1ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -395,6 +395,7 @@ Release 0.91.0 - Unreleased HBASE-4237 Directly remove the call being handled from the map of outstanding RPCs (Benoit Sigoure) HBASE-4199 blockCache summary - backend (Doug Meil) + HBASE-4240 Allow Loadbalancer to be pluggable TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 49d1e7c0810..29858b2c13b 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -101,6 +101,8 @@ public class AssignmentManager extends ZooKeeperListener { private TimeoutMonitor timeoutMonitor; + private LoadBalancer balancer; + /* * Maximum times we recurse an assignment. See below in {@link #assign()}. */ @@ -172,6 +174,7 @@ public class AssignmentManager extends ZooKeeperListener { this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); } /** @@ -1364,7 +1367,7 @@ public class AssignmentManager extends ZooKeeperListener { if (serverToExclude != null) servers.remove(serverToExclude); if (servers.isEmpty()) return null; RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, - LoadBalancer.randomAssignment(servers)); + balancer.randomAssignment(servers)); boolean newPlan = false; RegionPlan existingPlan = null; synchronized (this.regionPlans) { @@ -1564,7 +1567,7 @@ public class AssignmentManager extends ZooKeeperListener { return; Map> bulkPlan = null; // Generate a round-robin bulk assignment plan - bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers); + bulkPlan = balancer.roundRobinAssignment(regions, servers); LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " + servers.size() + " server(s)"); // Use fixed count thread pool assigning. @@ -1598,7 +1601,7 @@ public class AssignmentManager extends ZooKeeperListener { Map> bulkPlan = null; if (retainAssignment) { // Reuse existing assignment info - bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); + bulkPlan = balancer.retainAssignment(allRegions, servers); } else { // assign regions in round-robin fashion assignUserRegions(new ArrayList(allRegions.keySet()), servers); diff --git a/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java b/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java new file mode 100644 index 00000000000..d6083274f32 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java @@ -0,0 +1,723 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * Makes decisions about the placement and movement of Regions across + * RegionServers. + * + *

Cluster-wide load balancing will occur only when there are no regions in + * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. + * + *

Inline region placement with {@link #immediateAssignment} can be used when + * the Master needs to handle closed regions that it currently does not have + * a destination set for. This can happen during master failover. + * + *

On cluster startup, bulk assignment can be used to determine + * locations for all Regions in a cluster. + * + *

This classes produces plans for the {@link AssignmentManager} to execute. + */ +public class DefaultLoadBalancer implements LoadBalancer { + private static final Log LOG = LogFactory.getLog(LoadBalancer.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + // slop for regions + private float slop; + private Configuration config; + private ClusterStatus status; + private MasterServices services; + + public void setClusterStatus(ClusterStatus st) { + this.status = st; + } + + public void setMasterServices(MasterServices masterServices) { + this.services = masterServices; + } + + @Override + public void setConf(Configuration conf) { + this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); + if (slop < 0) slop = 0; + else if (slop > 1) slop = 1; + this.config = conf; + } + + @Override + public Configuration getConf() { + return this.config; + } + + /* + * The following comparator assumes that RegionId from HRegionInfo can + * represent the age of the region - larger RegionId means the region + * is younger. + * This comparator is used in balanceCluster() to account for the out-of-band + * regions which were assigned to the server after some other region server + * crashed. + */ + private class RegionInfoComparator implements Comparator { + @Override + public int compare(HRegionInfo l, HRegionInfo r) { + long diff = r.getRegionId() - l.getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } + } + + + RegionInfoComparator riComparator = new RegionInfoComparator(); + + private class RegionPlanComparator implements Comparator { + @Override + public int compare(RegionPlan l, RegionPlan r) { + long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } + } + + RegionPlanComparator rpComparator = new RegionPlanComparator(); + + /** + * Generate a global load balancing plan according to the specified map of + * server information to the most loaded regions of each server. + * + * The load balancing invariant is that all servers are within 1 region of the + * average number of regions per server. If the average is an integer number, + * all servers will be balanced to the average. Otherwise, all servers will + * have either floor(average) or ceiling(average) regions. + * + * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that + * we can fetch from both ends of the queue. + * At the beginning, we check whether there was empty region server + * just discovered by Master. If so, we alternately choose new / old + * regions from head / tail of regionsToMove, respectively. This alternation + * avoids clustering young regions on the newly discovered region server. + * Otherwise, we choose new regions from head of regionsToMove. + * + * Another improvement from HBASE-3609 is that we assign regions from + * regionsToMove to underloaded servers in round-robin fashion. + * Previously one underloaded server would be filled before we move onto + * the next underloaded server, leading to clustering of young regions. + * + * Finally, we randomly shuffle underloaded servers so that they receive + * offloaded regions relatively evenly across calls to balanceCluster(). + * + * The algorithm is currently implemented as such: + * + *

    + *
  1. Determine the two valid numbers of regions each server should have, + * MIN=floor(average) and MAX=ceiling(average). + * + *
  2. Iterate down the most loaded servers, shedding regions from each so + * each server hosts exactly MAX regions. Stop once you reach a + * server that already has <= MAX regions. + *

    + * Order the regions to move from most recent to least. + * + *

  3. Iterate down the least loaded servers, assigning regions so each server + * has exactly MIN regions. Stop once you reach a server that + * already has >= MIN regions. + * + * Regions being assigned to underloaded servers are those that were shed + * in the previous step. It is possible that there were not enough + * regions shed to fill each underloaded server to MIN. If so we + * end up with a number of regions required to do so, neededRegions. + * + * It is also possible that we were able to fill each underloaded but ended + * up with regions that were unassigned from overloaded servers but that + * still do not have assignment. + * + * If neither of these conditions hold (no regions needed to fill the + * underloaded servers, no regions leftover from overloaded servers), + * we are done and return. Otherwise we handle these cases below. + * + *
  4. If neededRegions is non-zero (still have underloaded servers), + * we iterate the most loaded servers again, shedding a single server from + * each (this brings them from having MAX regions to having + * MIN regions). + * + *
  5. We now definitely have more regions that need assignment, either from + * the previous step or from the original shedding from overloaded servers. + * Iterate the least loaded servers filling each to MIN. + * + *
  6. If we still have more regions that need assignment, again iterate the + * least loaded servers, this time giving each one (filling them to + * MAX) until we run out. + * + *
  7. All servers will now either host MIN or MAX regions. + * + * In addition, any server hosting >= MAX regions is guaranteed + * to end up with MAX regions at the end of the balancing. This + * ensures the minimal number of regions possible are moved. + *
+ * + * TODO: We can at-most reassign the number of regions away from a particular + * server to be how many they report as most loaded. + * Should we just keep all assignment in memory? Any objections? + * Does this mean we need HeapSize on HMaster? Or just careful monitor? + * (current thinking is we will hold all assignments in memory) + * + * @param clusterState Map of regionservers and their load/region information to + * a list of their most loaded regions + * @return a list of regions to be moved, including source and destination, + * or null if cluster is already balanced + */ + public List balanceCluster( + Map> clusterState) { + boolean emptyRegionServerPresent = false; + long startTime = System.currentTimeMillis(); + + int numServers = clusterState.size(); + if (numServers == 0) { + LOG.debug("numServers=0 so skipping load balancing"); + return null; + } + NavigableMap> serversByLoad = + new TreeMap>(); + int numRegions = 0; + StringBuilder strBalanceParam = new StringBuilder("Server information: "); + // Iterate so we can count regions as we build the map + for (Map.Entry> server: clusterState.entrySet()) { + List regions = server.getValue(); + int sz = regions.size(); + if (sz == 0) emptyRegionServerPresent = true; + numRegions += sz; + serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); + strBalanceParam.append(server.getKey().getServerName()).append("="). + append(server.getValue().size()).append(", "); + } + strBalanceParam.delete(strBalanceParam.length() - 2, + strBalanceParam.length()); + LOG.debug(strBalanceParam.toString()); + + // Check if we even need to do any load balancing + float average = (float)numRegions / numServers; // for logging + // HBASE-3681 check sloppiness first + int floor = (int) Math.floor(average * (1 - slop)); + int ceiling = (int) Math.ceil(average * (1 + slop)); + if (serversByLoad.lastKey().getLoad() <= ceiling && + serversByLoad.firstKey().getLoad() >= floor) { + // Skipped because no server outside (min,max) range + LOG.info("Skipping load balancing because balanced cluster; " + + "servers=" + numServers + " " + + "regions=" + numRegions + " average=" + average + " " + + "mostloaded=" + serversByLoad.lastKey().getLoad() + + " leastloaded=" + serversByLoad.firstKey().getLoad()); + return null; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + // Using to check banance result. + strBalanceParam.delete(0, strBalanceParam.length()); + strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) + .append(", numServers=").append(numServers).append(", max=").append(max) + .append(", min=").append(min); + LOG.debug(strBalanceParam.toString()); + + // Balance the cluster + // TODO: Look at data block locality or a more complex load to do this + MinMaxPriorityQueue regionsToMove = + MinMaxPriorityQueue.orderedBy(rpComparator).create(); + List regionsToReturn = new ArrayList(); + + // Walk down most loaded, pruning each to the max + int serversOverloaded = 0; + // flag used to fetch regions from head and tail of list, alternately + boolean fetchFromTail = false; + Map serverBalanceInfo = + new TreeMap(); + for (Map.Entry> server: + serversByLoad.descendingMap().entrySet()) { + ServerAndLoad sal = server.getKey(); + int regionCount = sal.getLoad(); + if (regionCount <= max) { + serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); + break; + } + serversOverloaded++; + List regions = server.getValue(); + int numToOffload = Math.min(regionCount - max, regions.size()); + // account for the out-of-band regions which were assigned to this server + // after some other region server crashed + Collections.sort(regions, riComparator); + int numTaken = 0; + for (int i = 0; i <= numToOffload; ) { + HRegionInfo hri = regions.get(i); // fetch from head + if (fetchFromTail) { + hri = regions.get(regions.size() - 1 - i); + } + i++; + // Don't rebalance meta regions. + if (hri.isMetaRegion()) continue; + regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); + numTaken++; + if (numTaken >= numToOffload) break; + // fetch in alternate order if there is new region server + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + serverBalanceInfo.put(sal.getServerName(), + new BalanceInfo(numToOffload, (-1)*numTaken)); + } + int totalNumMoved = regionsToMove.size(); + + // Walk down least loaded, filling each to the min + int neededRegions = 0; // number of regions needed to bring all up to min + fetchFromTail = false; + + Map underloadedServers = new HashMap(); + for (Map.Entry> server: + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if (regionCount >= min) { + break; + } + underloadedServers.put(server.getKey().getServerName(), min - regionCount); + } + // number of servers that get new regions + int serversUnderloaded = underloadedServers.size(); + int incr = 1; + List sns = + Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); + Collections.shuffle(sns, RANDOM); + while (regionsToMove.size() > 0) { + int cnt = 0; + int i = incr > 0 ? 0 : underloadedServers.size()-1; + for (; i >= 0 && i < underloadedServers.size(); i += incr) { + if (regionsToMove.isEmpty()) break; + ServerName si = sns.get(i); + int numToTake = underloadedServers.get(si); + if (numToTake == 0) continue; + + addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + + underloadedServers.put(si, numToTake-1); + cnt++; + BalanceInfo bi = serverBalanceInfo.get(si); + if (bi == null) { + bi = new BalanceInfo(0, 0); + serverBalanceInfo.put(si, bi); + } + bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); + } + if (cnt == 0) break; + // iterates underloadedServers in the other direction + incr = -incr; + } + for (Integer i : underloadedServers.values()) { + // If we still want to take some, increment needed + neededRegions += i; + } + + // If none needed to fill all to min and none left to drain all to max, + // we are done + if (neededRegions == 0 && regionsToMove.isEmpty()) { + long endTime = System.currentTimeMillis(); + LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + return regionsToReturn; + } + + // Need to do a second pass. + // Either more regions to assign out or servers that are still underloaded + + // If we need more to fill min, grab one from each most loaded until enough + if (neededRegions != 0) { + // Walk down most loaded, grabbing one from each until we get enough + for (Map.Entry> server : + serversByLoad.descendingMap().entrySet()) { + BalanceInfo balanceInfo = + serverBalanceInfo.get(server.getKey().getServerName()); + int idx = + balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); + if (idx >= server.getValue().size()) break; + HRegionInfo region = server.getValue().get(idx); + if (region.isMetaRegion()) continue; // Don't move meta regions. + regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); + totalNumMoved++; + if (--neededRegions == 0) { + // No more regions needed, done shedding + break; + } + } + } + + // Now we have a set of regions that must be all assigned out + // Assign each underloaded up to the min, then if leftovers, assign to max + + // Walk down least loaded, assigning to each to fill up to min + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if (regionCount >= min) break; + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= min) { + continue; + } + int numToTake = min - regionCount; + int numTaken = 0; + while(numTaken < numToTake && 0 < regionsToMove.size()) { + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + numTaken++; + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + } + + // If we still have regions to dish out, assign underloaded to max + if (0 < regionsToMove.size()) { + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if(regionCount >= max) { + break; + } + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + if (regionsToMove.isEmpty()) { + break; + } + } + } + + long endTime = System.currentTimeMillis(); + + if (!regionsToMove.isEmpty() || neededRegions != 0) { + // Emit data so can diagnose how balancer went astray. + LOG.warn("regionsToMove=" + totalNumMoved + + ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + + ", serversUnderloaded=" + serversUnderloaded); + StringBuilder sb = new StringBuilder(); + for (Map.Entry> e: clusterState.entrySet()) { + if (sb.length() > 0) sb.append(", "); + sb.append(e.getKey().toString()); + sb.append(" "); + sb.append(e.getValue().size()); + } + LOG.warn("Input " + sb.toString()); + } + + // All done! + LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + + return regionsToReturn; + } + + /** + * Add a region from the head or tail to the List of regions to return. + */ + void addRegionPlan(final MinMaxPriorityQueue regionsToMove, + final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { + RegionPlan rp = null; + if (!fetchFromTail) rp = regionsToMove.remove(); + else rp = regionsToMove.removeLast(); + rp.setDestination(sn); + regionsToReturn.add(rp); + } + + /** + * Stores additional per-server information about the regions added/removed + * during the run of the balancing algorithm. + * + * For servers that shed regions, we need to track which regions we have + * already shed. nextRegionForUnload contains the index in the list + * of regions on the server that is the next to be shed. + */ + private static class BalanceInfo { + + private final int nextRegionForUnload; + private int numRegionsAdded; + + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + this.nextRegionForUnload = nextRegionForUnload; + this.numRegionsAdded = numRegionsAdded; + } + + public int getNextRegionForUnload() { + return nextRegionForUnload; + } + + public int getNumRegionsAdded() { + return numRegionsAdded; + } + + public void setNumRegionsAdded(int numAdded) { + this.numRegionsAdded = numAdded; + } + } + + /** + * Generates a bulk assignment plan to be used on cluster startup using a + * simple round-robin assignment. + *

+ * Takes a list of all the regions and all the servers in the cluster and + * returns a map of each server to the regions that it should be assigned. + *

+ * Currently implemented as a round-robin assignment. Same invariant as + * load balancing, all servers holding floor(avg) or ceiling(avg). + * + * TODO: Use block locations from HDFS to place regions with their blocks + * + * @param regions all regions + * @param servers all servers + * @return map of server to the regions it should take, or null if no + * assignment is possible (ie. no regions or no servers) + */ + public Map> roundRobinAssignment( + List regions, List servers) { + if (regions.isEmpty() || servers.isEmpty()) { + return null; + } + Map> assignments = + new TreeMap>(); + int numRegions = regions.size(); + int numServers = servers.size(); + int max = (int)Math.ceil((float)numRegions/numServers); + int serverIdx = 0; + if (numServers > 1) { + serverIdx = RANDOM.nextInt(numServers); + } + int regionIdx = 0; + for (int j = 0; j < numServers; j++) { + ServerName server = servers.get((j + serverIdx) % numServers); + List serverRegions = new ArrayList(max); + for (int i=regionIdx; i + * Takes a map of all regions to their existing assignment from META. Also + * takes a list of online servers for regions to be assigned to. Attempts to + * retain all assignment, so in some instances initial assignment will not be + * completely balanced. + *

+ * Any leftover regions without an existing server to be assigned to will be + * assigned randomly to available servers. + * @param regions regions and existing assignment from meta + * @param servers available servers + * @return map of servers and regions to be assigned to them + */ + public Map> retainAssignment( + Map regions, List servers) { + Map> assignments = + new TreeMap>(); + for (ServerName server : servers) { + assignments.put(server, new ArrayList()); + } + for (Map.Entry region : regions.entrySet()) { + ServerName sn = region.getValue(); + if (sn != null && servers.contains(sn)) { + assignments.get(sn).add(region.getKey()); + } else { + int size = assignments.size(); + assignments.get(servers.get(RANDOM.nextInt(size))).add(region.getKey()); + } + } + return assignments; + } + + /** + * Returns an ordered list of hosts that are hosting the blocks for this + * region. The weight of each host is the sum of the block lengths of all + * files on that host, so the first host in the list is the server which + * holds the most bytes of the given region's HFiles. + * + * @param fs the filesystem + * @param region region + * @return ordered list of hosts holding blocks of the specified region + */ + @SuppressWarnings("unused") + private List getTopBlockLocations(FileSystem fs, + HRegionInfo region) { + List topServerNames = null; + try { + HTableDescriptor tableDescriptor = getTableDescriptor( + region.getTableName()); + if (tableDescriptor != null) { + HDFSBlocksDistribution blocksDistribution = + HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, + region.getEncodedName()); + List topHosts = blocksDistribution.getTopHosts(); + topServerNames = mapHostNameToServerName(topHosts); + } + } catch (IOException ioe) { + LOG.debug("IOException during HDFSBlocksDistribution computation. for " + + "region = " + region.getEncodedName() , ioe); + } + + return topServerNames; + } + + /** + * return HTableDescriptor for a given tableName + * @param tableName the table name + * @return HTableDescriptor + * @throws IOException + */ + private HTableDescriptor getTableDescriptor(byte[] tableName) + throws IOException { + HTableDescriptor tableDescriptor = null; + try { + if ( this.services != null) + { + tableDescriptor = this.services.getTableDescriptors(). + get(Bytes.toString(tableName)); + } + } catch (TableExistsException tee) { + LOG.debug("TableExistsException during getTableDescriptors." + + " Current table name = " + tableName , tee); + } catch (FileNotFoundException fnfe) { + LOG.debug("FileNotFoundException during getTableDescriptors." + + " Current table name = " + tableName , fnfe); + } + + return tableDescriptor; + } + + /** + * Map hostname to ServerName, The output ServerName list will have the same + * order as input hosts. + * @param hosts the list of hosts + * @return ServerName list + */ + private List mapHostNameToServerName(List hosts) { + if ( hosts == null || status == null) { + return null; + } + + List topServerNames = new ArrayList(); + Collection regionServers = status.getServers(); + + // create a mapping from hostname to ServerName for fast lookup + HashMap hostToServerName = + new HashMap(); + for (ServerName sn : regionServers) { + hostToServerName.put(sn.getHostname(), sn); + } + + for (String host : hosts ) { + ServerName sn = hostToServerName.get(host); + // it is possible that HDFS is up ( thus host is valid ), + // but RS is down ( thus sn is null ) + if (sn != null) { + topServerNames.add(sn); + } + } + return topServerNames; + } + + + /** + * Generates an immediate assignment plan to be used by a new master for + * regions in transition that do not have an already known destination. + * + * Takes a list of regions that need immediate assignment and a list of + * all available servers. Returns a map of regions to the server they + * should be assigned to. + * + * This method will return quickly and does not do any intelligent + * balancing. The goal is to make a fast decision not the best decision + * possible. + * + * Currently this is random. + * + * @param regions + * @param servers + * @return map of regions to the server it should be assigned to + */ + public Map immediateAssignment( + List regions, List servers) { + Map assignments = + new TreeMap(); + for(HRegionInfo region : regions) { + assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); + } + return assignments; + } + + public ServerName randomAssignment(List servers) { + if (servers == null || servers.isEmpty()) { + LOG.warn("Wanted to do random assignment but no servers to assign to"); + return null; + } + return servers.get(RANDOM.nextInt(servers.size())); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 97841953b27..92d5dbb857c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -354,7 +354,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.assignmentManager = new AssignmentManager(this, serverManager, this.catalogTracker, this.executorService); - this.balancer = new LoadBalancer(conf); + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, diff --git a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 05460b649eb..ba0d422265c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,34 +19,13 @@ */ package org.apache.hadoop.hbase.master; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + import java.util.List; import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.collect.MinMaxPriorityQueue; /** * Makes decisions about the placement and movement of Regions across @@ -64,764 +43,56 @@ import com.google.common.collect.MinMaxPriorityQueue; * *

This classes produces plans for the {@link AssignmentManager} to execute. */ -public class LoadBalancer { - private static final Log LOG = LogFactory.getLog(LoadBalancer.class); - private static final Random RANDOM = new Random(System.currentTimeMillis()); - // slop for regions - private float slop; - private Configuration config; - private ClusterStatus status; - private MasterServices services; +public interface LoadBalancer extends Configurable { - LoadBalancer(Configuration conf) { - this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); - if (slop < 0) slop = 0; - else if (slop > 1) slop = 1; - this.config = conf; - } - - public void setClusterStatus(ClusterStatus st) { - this.status = st; - } - - public void setMasterServices(MasterServices masterServices) { - this.services = masterServices; - } - - /* - * The following comparator assumes that RegionId from HRegionInfo can - * represent the age of the region - larger RegionId means the region - * is younger. - * This comparator is used in balanceCluster() to account for the out-of-band - * regions which were assigned to the server after some other region server - * crashed. + /** + * Set the current cluster status. This allows a LoadBalancer to map host name to a server + * @param st */ - static class RegionInfoComparator implements Comparator { - @Override - public int compare(HRegionInfo l, HRegionInfo r) { - long diff = r.getRegionId() - l.getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; - } - } - static RegionInfoComparator riComparator = new RegionInfoComparator(); - - static class RegionPlanComparator implements Comparator { - @Override - public int compare(RegionPlan l, RegionPlan r) { - long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; - } - } - static RegionPlanComparator rpComparator = new RegionPlanComparator(); + public void setClusterStatus(ClusterStatus st); + /** - * Data structure that holds servername and 'load'. + * Set the master service. + * @param masterServices */ - static class ServerAndLoad implements Comparable { - private final ServerName sn; - private final int load; - ServerAndLoad(final ServerName sn, final int load) { - this.sn = sn; - this.load = load; - } - - ServerName getServerName() {return this.sn;} - int getLoad() {return this.load;} - - @Override - public int compareTo(ServerAndLoad other) { - int diff = this.load - other.load; - return diff != 0? diff: this.sn.compareTo(other.getServerName()); - } - } + public void setMasterServices(MasterServices masterServices); /** - * Generate a global load balancing plan according to the specified map of - * server information to the most loaded regions of each server. - * - * The load balancing invariant is that all servers are within 1 region of the - * average number of regions per server. If the average is an integer number, - * all servers will be balanced to the average. Otherwise, all servers will - * have either floor(average) or ceiling(average) regions. - * - * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that - * we can fetch from both ends of the queue. - * At the beginning, we check whether there was empty region server - * just discovered by Master. If so, we alternately choose new / old - * regions from head / tail of regionsToMove, respectively. This alternation - * avoids clustering young regions on the newly discovered region server. - * Otherwise, we choose new regions from head of regionsToMove. - * - * Another improvement from HBASE-3609 is that we assign regions from - * regionsToMove to underloaded servers in round-robin fashion. - * Previously one underloaded server would be filled before we move onto - * the next underloaded server, leading to clustering of young regions. - * - * Finally, we randomly shuffle underloaded servers so that they receive - * offloaded regions relatively evenly across calls to balanceCluster(). - * - * The algorithm is currently implemented as such: - * - *

    - *
  1. Determine the two valid numbers of regions each server should have, - * MIN=floor(average) and MAX=ceiling(average). - * - *
  2. Iterate down the most loaded servers, shedding regions from each so - * each server hosts exactly MAX regions. Stop once you reach a - * server that already has <= MAX regions. - *

    - * Order the regions to move from most recent to least. - * - *

  3. Iterate down the least loaded servers, assigning regions so each server - * has exactly MIN regions. Stop once you reach a server that - * already has >= MIN regions. - * - * Regions being assigned to underloaded servers are those that were shed - * in the previous step. It is possible that there were not enough - * regions shed to fill each underloaded server to MIN. If so we - * end up with a number of regions required to do so, neededRegions. - * - * It is also possible that we were able to fill each underloaded but ended - * up with regions that were unassigned from overloaded servers but that - * still do not have assignment. - * - * If neither of these conditions hold (no regions needed to fill the - * underloaded servers, no regions leftover from overloaded servers), - * we are done and return. Otherwise we handle these cases below. - * - *
  4. If neededRegions is non-zero (still have underloaded servers), - * we iterate the most loaded servers again, shedding a single server from - * each (this brings them from having MAX regions to having - * MIN regions). - * - *
  5. We now definitely have more regions that need assignment, either from - * the previous step or from the original shedding from overloaded servers. - * Iterate the least loaded servers filling each to MIN. - * - *
  6. If we still have more regions that need assignment, again iterate the - * least loaded servers, this time giving each one (filling them to - * MAX) until we run out. - * - *
  7. All servers will now either host MIN or MAX regions. - * - * In addition, any server hosting >= MAX regions is guaranteed - * to end up with MAX regions at the end of the balancing. This - * ensures the minimal number of regions possible are moved. - *
- * - * TODO: We can at-most reassign the number of regions away from a particular - * server to be how many they report as most loaded. - * Should we just keep all assignment in memory? Any objections? - * Does this mean we need HeapSize on HMaster? Or just careful monitor? - * (current thinking is we will hold all assignments in memory) - * - * @param clusterState Map of regionservers and their load/region information to - * a list of their most loaded regions - * @return a list of regions to be moved, including source and destination, - * or null if cluster is already balanced + * Perform the major balance operation + * @param clusterState + * @return */ - public List balanceCluster( - Map> clusterState) { - boolean emptyRegionServerPresent = false; - long startTime = System.currentTimeMillis(); - - int numServers = clusterState.size(); - if (numServers == 0) { - LOG.debug("numServers=0 so skipping load balancing"); - return null; - } - NavigableMap> serversByLoad = - new TreeMap>(); - int numRegions = 0; - StringBuilder strBalanceParam = new StringBuilder("Server information: "); - // Iterate so we can count regions as we build the map - for (Map.Entry> server: clusterState.entrySet()) { - List regions = server.getValue(); - int sz = regions.size(); - if (sz == 0) emptyRegionServerPresent = true; - numRegions += sz; - serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); - strBalanceParam.append(server.getKey().getServerName()).append("="). - append(server.getValue().size()).append(", "); - } - strBalanceParam.delete(strBalanceParam.length() - 2, - strBalanceParam.length()); - LOG.debug(strBalanceParam.toString()); - - // Check if we even need to do any load balancing - float average = (float)numRegions / numServers; // for logging - // HBASE-3681 check sloppiness first - int floor = (int) Math.floor(average * (1 - slop)); - int ceiling = (int) Math.ceil(average * (1 + slop)); - if (serversByLoad.lastKey().getLoad() <= ceiling && - serversByLoad.firstKey().getLoad() >= floor) { - // Skipped because no server outside (min,max) range - LOG.info("Skipping load balancing because balanced cluster; " + - "servers=" + numServers + " " + - "regions=" + numRegions + " average=" + average + " " + - "mostloaded=" + serversByLoad.lastKey().getLoad() + - " leastloaded=" + serversByLoad.firstKey().getLoad()); - return null; - } - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - // Using to check banance result. - strBalanceParam.delete(0, strBalanceParam.length()); - strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) - .append(", numServers=").append(numServers).append(", max=").append(max) - .append(", min=").append(min); - LOG.debug(strBalanceParam.toString()); - - // Balance the cluster - // TODO: Look at data block locality or a more complex load to do this - MinMaxPriorityQueue regionsToMove = - MinMaxPriorityQueue.orderedBy(rpComparator).create(); - List regionsToReturn = new ArrayList(); - - // Walk down most loaded, pruning each to the max - int serversOverloaded = 0; - // flag used to fetch regions from head and tail of list, alternately - boolean fetchFromTail = false; - Map serverBalanceInfo = - new TreeMap(); - for (Map.Entry> server: - serversByLoad.descendingMap().entrySet()) { - ServerAndLoad sal = server.getKey(); - int regionCount = sal.getLoad(); - if (regionCount <= max) { - serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); - break; - } - serversOverloaded++; - List regions = server.getValue(); - int numToOffload = Math.min(regionCount - max, regions.size()); - // account for the out-of-band regions which were assigned to this server - // after some other region server crashed - Collections.sort(regions, riComparator); - int numTaken = 0; - for (int i = 0; i <= numToOffload; ) { - HRegionInfo hri = regions.get(i); // fetch from head - if (fetchFromTail) { - hri = regions.get(regions.size() - 1 - i); - } - i++; - // Don't rebalance meta regions. - if (hri.isMetaRegion()) continue; - regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); - numTaken++; - if (numTaken >= numToOffload) break; - // fetch in alternate order if there is new region server - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - serverBalanceInfo.put(sal.getServerName(), - new BalanceInfo(numToOffload, (-1)*numTaken)); - } - int totalNumMoved = regionsToMove.size(); - - // Walk down least loaded, filling each to the min - int neededRegions = 0; // number of regions needed to bring all up to min - fetchFromTail = false; - - Map underloadedServers = new HashMap(); - for (Map.Entry> server: - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) { - break; - } - underloadedServers.put(server.getKey().getServerName(), min - regionCount); - } - // number of servers that get new regions - int serversUnderloaded = underloadedServers.size(); - int incr = 1; - List sns = - Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); - Collections.shuffle(sns, RANDOM); - while (regionsToMove.size() > 0) { - int cnt = 0; - int i = incr > 0 ? 0 : underloadedServers.size()-1; - for (; i >= 0 && i < underloadedServers.size(); i += incr) { - if (regionsToMove.isEmpty()) break; - ServerName si = sns.get(i); - int numToTake = underloadedServers.get(si); - if (numToTake == 0) continue; - - addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - - underloadedServers.put(si, numToTake-1); - cnt++; - BalanceInfo bi = serverBalanceInfo.get(si); - if (bi == null) { - bi = new BalanceInfo(0, 0); - serverBalanceInfo.put(si, bi); - } - bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); - } - if (cnt == 0) break; - // iterates underloadedServers in the other direction - incr = -incr; - } - for (Integer i : underloadedServers.values()) { - // If we still want to take some, increment needed - neededRegions += i; - } - - // If none needed to fill all to min and none left to drain all to max, - // we are done - if (neededRegions == 0 && regionsToMove.isEmpty()) { - long endTime = System.currentTimeMillis(); - LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - return regionsToReturn; - } - - // Need to do a second pass. - // Either more regions to assign out or servers that are still underloaded - - // If we need more to fill min, grab one from each most loaded until enough - if (neededRegions != 0) { - // Walk down most loaded, grabbing one from each until we get enough - for (Map.Entry> server : - serversByLoad.descendingMap().entrySet()) { - BalanceInfo balanceInfo = - serverBalanceInfo.get(server.getKey().getServerName()); - int idx = - balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); - if (idx >= server.getValue().size()) break; - HRegionInfo region = server.getValue().get(idx); - if (region.isMetaRegion()) continue; // Don't move meta regions. - regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); - totalNumMoved++; - if (--neededRegions == 0) { - // No more regions needed, done shedding - break; - } - } - } - - // Now we have a set of regions that must be all assigned out - // Assign each underloaded up to the min, then if leftovers, assign to max - - // Walk down least loaded, assigning to each to fill up to min - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) break; - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= min) { - continue; - } - int numToTake = min - regionCount; - int numTaken = 0; - while(numTaken < numToTake && 0 < regionsToMove.size()) { - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - numTaken++; - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - } - - // If we still have regions to dish out, assign underloaded to max - if (0 < regionsToMove.size()) { - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if(regionCount >= max) { - break; - } - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - if (regionsToMove.isEmpty()) { - break; - } - } - } - - long endTime = System.currentTimeMillis(); - - if (!regionsToMove.isEmpty() || neededRegions != 0) { - // Emit data so can diagnose how balancer went astray. - LOG.warn("regionsToMove=" + totalNumMoved + - ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + - ", serversUnderloaded=" + serversUnderloaded); - StringBuilder sb = new StringBuilder(); - for (Map.Entry> e: clusterState.entrySet()) { - if (sb.length() > 0) sb.append(", "); - sb.append(e.getKey().toString()); - sb.append(" "); - sb.append(e.getValue().size()); - } - LOG.warn("Input " + sb.toString()); - } - - // All done! - LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - - return regionsToReturn; - } + public List balanceCluster(Map> clusterState); /** - * Add a region from the head or tail to the List of regions to return. - */ - void addRegionPlan(final MinMaxPriorityQueue regionsToMove, - final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { - RegionPlan rp = null; - if (!fetchFromTail) rp = regionsToMove.remove(); - else rp = regionsToMove.removeLast(); - rp.setDestination(sn); - regionsToReturn.add(rp); - } - - /** - * @param regions - * @return Randomization of passed regions - */ - static List randomize(final List regions) { - Collections.shuffle(regions, RANDOM); - return regions; - } - - /** - * Stores additional per-server information about the regions added/removed - * during the run of the balancing algorithm. - * - * For servers that shed regions, we need to track which regions we have - * already shed. nextRegionForUnload contains the index in the list - * of regions on the server that is the next to be shed. - */ - private static class BalanceInfo { - - private final int nextRegionForUnload; - private int numRegionsAdded; - - public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { - this.nextRegionForUnload = nextRegionForUnload; - this.numRegionsAdded = numRegionsAdded; - } - - public int getNextRegionForUnload() { - return nextRegionForUnload; - } - - public int getNumRegionsAdded() { - return numRegionsAdded; - } - - public void setNumRegionsAdded(int numAdded) { - this.numRegionsAdded = numAdded; - } - } - - /** - * Generates a bulk assignment plan to be used on cluster startup using a - * simple round-robin assignment. - *

- * Takes a list of all the regions and all the servers in the cluster and - * returns a map of each server to the regions that it should be assigned. - *

- * Currently implemented as a round-robin assignment. Same invariant as - * load balancing, all servers holding floor(avg) or ceiling(avg). - * - * TODO: Use block locations from HDFS to place regions with their blocks - * - * @param regions all regions - * @param servers all servers - * @return map of server to the regions it should take, or null if no - * assignment is possible (ie. no regions or no servers) - */ - public static Map> roundRobinAssignment( - List regions, List servers) { - if (regions.isEmpty() || servers.isEmpty()) { - return null; - } - Map> assignments = - new TreeMap>(); - int numRegions = regions.size(); - int numServers = servers.size(); - int max = (int)Math.ceil((float)numRegions/numServers); - int serverIdx = 0; - if (numServers > 1) { - serverIdx = RANDOM.nextInt(numServers); - } - int regionIdx = 0; - for (int j = 0; j < numServers; j++) { - ServerName server = servers.get((j + serverIdx) % numServers); - List serverRegions = new ArrayList(max); - for (int i=regionIdx; i - * Takes a map of all regions to their existing assignment from META. Also - * takes a list of online servers for regions to be assigned to. Attempts to - * retain all assignment, so in some instances initial assignment will not be - * completely balanced. - *

- * Any leftover regions without an existing server to be assigned to will be - * assigned randomly to available servers. - * @param regions regions and existing assignment from meta - * @param servers available servers - * @return map of servers and regions to be assigned to them - */ - public static Map> retainAssignment( - Map regions, List servers) { - Map> assignments = - new TreeMap>(); - for (ServerName server : servers) { - assignments.put(server, new ArrayList()); - } - for (Map.Entry region : regions.entrySet()) { - ServerName sn = region.getValue(); - if (sn != null && servers.contains(sn)) { - assignments.get(sn).add(region.getKey()); - } else { - int size = assignments.size(); - assignments.get(servers.get(RANDOM.nextInt(size))).add(region.getKey()); - } - } - return assignments; - } - - /** - * Returns an ordered list of hosts that are hosting the blocks for this - * region. The weight of each host is the sum of the block lengths of all - * files on that host, so the first host in the list is the server which - * holds the most bytes of the given region's HFiles. - * - * @param fs the filesystem - * @param region region - * @return ordered list of hosts holding blocks of the specified region - */ - @SuppressWarnings("unused") - private List getTopBlockLocations(FileSystem fs, - HRegionInfo region) { - List topServerNames = null; - try { - HTableDescriptor tableDescriptor = getTableDescriptor( - region.getTableName()); - if (tableDescriptor != null) { - HDFSBlocksDistribution blocksDistribution = - HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, - region.getEncodedName()); - List topHosts = blocksDistribution.getTopHosts(); - topServerNames = mapHostNameToServerName(topHosts); - } - } catch (IOException ioe) { - LOG.debug("IOException during HDFSBlocksDistribution computation. for " + - "region = " + region.getEncodedName() , ioe); - } - - return topServerNames; - } - - /** - * return HTableDescriptor for a given tableName - * @param tableName the table name - * @return HTableDescriptor - * @throws IOException - */ - private HTableDescriptor getTableDescriptor(byte[] tableName) - throws IOException { - HTableDescriptor tableDescriptor = null; - try { - if ( this.services != null) - { - tableDescriptor = this.services.getTableDescriptors(). - get(Bytes.toString(tableName)); - } - } catch (TableExistsException tee) { - LOG.debug("TableExistsException during getTableDescriptors." + - " Current table name = " + tableName , tee); - } catch (FileNotFoundException fnfe) { - LOG.debug("FileNotFoundException during getTableDescriptors." + - " Current table name = " + tableName , fnfe); - } - - return tableDescriptor; - } - - /** - * Map hostname to ServerName, The output ServerName list will have the same - * order as input hosts. - * @param hosts the list of hosts - * @return ServerName list - */ - private List mapHostNameToServerName(List hosts) { - if ( hosts == null || status == null) { - return null; - } - - List topServerNames = new ArrayList(); - Collection regionServers = status.getServers(); - - // create a mapping from hostname to ServerName for fast lookup - HashMap hostToServerName = - new HashMap(); - for (ServerName sn : regionServers) { - hostToServerName.put(sn.getHostname(), sn); - } - - for (String host : hosts ) { - ServerName sn = hostToServerName.get(host); - // it is possible that HDFS is up ( thus host is valid ), - // but RS is down ( thus sn is null ) - if (sn != null) { - topServerNames.add(sn); - } - } - return topServerNames; - } - - - /** - * Generates an immediate assignment plan to be used by a new master for - * regions in transition that do not have an already known destination. - * - * Takes a list of regions that need immediate assignment and a list of - * all available servers. Returns a map of regions to the server they - * should be assigned to. - * - * This method will return quickly and does not do any intelligent - * balancing. The goal is to make a fast decision not the best decision - * possible. - * - * Currently this is random. - * + * Perform a Round Robin assignment of regions. * @param regions * @param servers - * @return map of regions to the server it should be assigned to + * @return */ - public static Map immediateAssignment( - List regions, List servers) { - Map assignments = - new TreeMap(); - for(HRegionInfo region : regions) { - assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); - } - return assignments; - } - - public static ServerName randomAssignment(List servers) { - if (servers == null || servers.isEmpty()) { - LOG.warn("Wanted to do random assignment but no servers to assign to"); - return null; - } - return servers.get(RANDOM.nextInt(servers.size())); - } + public Map> roundRobinAssignment(List regions, List servers); /** - * Stores the plan for the move of an individual region. - * - * Contains info for the region being moved, info for the server the region - * should be moved from, and info for the server the region should be moved - * to. - * - * The comparable implementation of this class compares only the region - * information and not the source/dest server info. + * Assign regions to the previously hosting region server + * @param regions + * @param servers + * @return */ - public static class RegionPlan implements Comparable { - private final HRegionInfo hri; - private final ServerName source; - private ServerName dest; + public Map> retainAssignment(Map regions, List servers); - /** - * Instantiate a plan for a region move, moving the specified region from - * the specified source server to the specified destination server. - * - * Destination server can be instantiated as null and later set - * with {@link #setDestination(ServerName)}. - * - * @param hri region to be moved - * @param source regionserver region should be moved from - * @param dest regionserver region should be moved to - */ - public RegionPlan(final HRegionInfo hri, ServerName source, ServerName dest) { - this.hri = hri; - this.source = source; - this.dest = dest; - } + /** + * Sync assign a region + * @param regions + * @param servers + * @return + */ + public Map immediateAssignment(List regions, List servers); - /** - * Set the destination server for the plan for this region. - */ - public void setDestination(ServerName dest) { - this.dest = dest; - } - - /** - * Get the source server for the plan for this region. - * @return server info for source - */ - public ServerName getSource() { - return source; - } - - /** - * Get the destination server for the plan for this region. - * @return server info for destination - */ - public ServerName getDestination() { - return dest; - } - - /** - * Get the encoded region name for the region this plan is for. - * @return Encoded region name - */ - public String getRegionName() { - return this.hri.getEncodedName(); - } - - public HRegionInfo getRegionInfo() { - return this.hri; - } - - /** - * Compare the region info. - * @param o region plan you are comparing against - */ - @Override - public int compareTo(RegionPlan o) { - return getRegionName().compareTo(o.getRegionName()); - } - - @Override - public String toString() { - return "hri=" + this.hri.getRegionNameAsString() + ", src=" + - (this.source == null? "": this.source.toString()) + - ", dest=" + (this.dest == null? "": this.dest.toString()); - } - } + /** + * Get a random region server from the list + * @param servers + * @return + */ + public ServerName randomAssignment(List servers); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java new file mode 100644 index 00000000000..69f76b3d5a2 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java @@ -0,0 +1,43 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * The class that creates a load balancer from a conf. + */ +public class LoadBalancerFactory { + + /** + * Create a loadblanacer from the given conf. + * @param conf + * @return + */ + public static LoadBalancer getLoadBalancer(Configuration conf) { + + // Create the balancer + Class balancerKlass = conf.getClass("hbase.maser.loadBalancer.class",DefaultLoadBalancer.class, LoadBalancer.class); + return ReflectionUtils.newInstance(balancerKlass, conf); + + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java b/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java new file mode 100644 index 00000000000..1561e00be4a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * Stores the plan for the move of an individual region. + * + * Contains info for the region being moved, info for the server the region + * should be moved from, and info for the server the region should be moved + * to. + * + * The comparable implementation of this class compares only the region + * information and not the source/dest server info. + */ +public class RegionPlan implements Comparable { + private final HRegionInfo hri; + private final ServerName source; + private ServerName dest; + + /** + * Instantiate a plan for a region move, moving the specified region from + * the specified source server to the specified destination server. + * + * Destination server can be instantiated as null and later set + * with {@link #setDestination(ServerName)}. + * + * @param hri region to be moved + * @param source regionserver region should be moved from + * @param dest regionserver region should be moved to + */ + public RegionPlan(final HRegionInfo hri, ServerName source, ServerName dest) { + this.hri = hri; + this.source = source; + this.dest = dest; + } + + /** + * Set the destination server for the plan for this region. + */ + public void setDestination(ServerName dest) { + this.dest = dest; + } + + /** + * Get the source server for the plan for this region. + * @return server info for source + */ + public ServerName getSource() { + return source; + } + + /** + * Get the destination server for the plan for this region. + * @return server info for destination + */ + public ServerName getDestination() { + return dest; + } + + /** + * Get the encoded region name for the region this plan is for. + * @return Encoded region name + */ + public String getRegionName() { + return this.hri.getEncodedName(); + } + + public HRegionInfo getRegionInfo() { + return this.hri; + } + + /** + * Compare the region info. + * @param o region plan you are comparing against + */ + @Override + public int compareTo(RegionPlan o) { + return getRegionName().compareTo(o.getRegionName()); + } + + @Override + public String toString() { + return "hri=" + this.hri.getRegionNameAsString() + ", src=" + + (this.source == null? "": this.source.toString()) + + ", dest=" + (this.dest == null? "": this.dest.toString()); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java b/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java new file mode 100644 index 00000000000..ac836d98cd1 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.hbase.master; + + +import org.apache.hadoop.hbase.ServerName; + +/** + * Data structure that holds servername and 'load'. + */ +class ServerAndLoad implements Comparable { + private final ServerName sn; + private final int load; + + ServerAndLoad(final ServerName sn, final int load) { + this.sn = sn; + this.load = load; + } + + ServerName getServerName() { + return this.sn; + } + + int getLoad() { + return this.load; + } + + @Override + public int compareTo(ServerAndLoad other) { + int diff = this.load - other.load; + return diff != 0 ? diff : this.sn.compareTo(other.getServerName()); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java b/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java similarity index 74% rename from src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java rename to src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java index 78da7feffea..a4cd9a36a56 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,32 +23,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.BeforeClass; import org.junit.Test; -public class TestLoadBalancer { - private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class); + +/** + * Test the load balancer that is created by default. + */ +public class TestDefaultLoadBalancer { + private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); private static LoadBalancer loadBalancer; @@ -58,7 +51,8 @@ public class TestLoadBalancer { public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.regions.slop", "0"); - loadBalancer = new LoadBalancer(conf); + loadBalancer = new DefaultLoadBalancer(); + loadBalancer.setConf(conf); rand = new Random(); } @@ -138,38 +132,6 @@ public class TestLoadBalancer { new int [] { 12, 100 }, }; - @Test - public void testRandomizer() { - for(int [] mockCluster : clusterStateMocks) { - if (mockCluster.length < 5) continue; - Map> servers = - mockClusterServers(mockCluster); - for (Map.Entry> e: servers.entrySet()) { - List original = e.getValue(); - if (original.size() < 5) continue; - // Try ten times in case random chances upon original order more than - // one or two times in a row. - boolean same = true; - for (int i = 0; i < 10 && same; i++) { - List copy = new ArrayList(original); - System.out.println("Randomizing before " + copy.size()); - for (HRegionInfo hri: copy) { - System.out.println(hri.getEncodedName()); - } - List randomized = LoadBalancer.randomize(copy); - System.out.println("Randomizing after " + randomized.size()); - for (HRegionInfo hri: randomized) { - System.out.println(hri.getEncodedName()); - } - if (original.equals(randomized)) continue; - same = false; - break; - } - assertFalse(same); - } - } - } - /** * Test the load balancing algorithm. * @@ -183,10 +145,10 @@ public class TestLoadBalancer { for(int [] mockCluster : clusterStateMocks) { Map> servers = mockClusterServers(mockCluster); - List list = convertToList(servers); + List list = convertToList(servers); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans); + List balancedCluster = reconcile(list, plans); LOG.info("Mock Balance : " + printMock(balancedCluster)); assertClusterAsBalanced(balancedCluster); for(Map.Entry> entry : servers.entrySet()) { @@ -201,12 +163,12 @@ public class TestLoadBalancer { * Invariant is that all servers have between floor(avg) and ceiling(avg) * number of regions. */ - public void assertClusterAsBalanced(List servers) { + public void assertClusterAsBalanced(List servers) { int numServers = servers.size(); int numRegions = 0; int maxRegions = 0; int minRegions = Integer.MAX_VALUE; - for(LoadBalancer.ServerAndLoad server : servers) { + for(ServerAndLoad server : servers) { int nr = server.getLoad(); if(nr > maxRegions) { maxRegions = nr; @@ -223,7 +185,7 @@ public class TestLoadBalancer { int min = numRegions / numServers; int max = numRegions % numServers == 0 ? min : min + 1; - for(LoadBalancer.ServerAndLoad server : servers) { + for(ServerAndLoad server : servers) { assertTrue(server.getLoad() <= max); assertTrue(server.getLoad() >= min); } @@ -241,10 +203,10 @@ public class TestLoadBalancer { for(int [] mock : regionsAndServersMocks) { LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); + List servers = randomServers(mock[1], 0); List list = getListOfServerNames(servers); Map assignments = - LoadBalancer.immediateAssignment(regions, list); + loadBalancer.immediateAssignment(regions, list); assertImmediateAssignment(regions, list, assignments); returnRegions(regions); returnServers(list); @@ -277,10 +239,10 @@ public class TestLoadBalancer { for(int [] mock : regionsAndServersMocks) { LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); + List servers = randomServers(mock[1], 0); List list = getListOfServerNames(servers); Map> assignments = - LoadBalancer.roundRobinAssignment(regions, list); + loadBalancer.roundRobinAssignment(regions, list); float average = (float)regions.size()/servers.size(); int min = (int)Math.floor(average); int max = (int)Math.ceil(average); @@ -302,7 +264,7 @@ public class TestLoadBalancer { @Test public void testRetainAssignment() throws Exception { // Test simple case where all same servers are there - List servers = randomServers(10, 10); + List servers = randomServers(10, 10); List regions = randomRegions(100); Map existing = new TreeMap(); @@ -311,31 +273,31 @@ public class TestLoadBalancer { } List listOfServerNames = getListOfServerNames(servers); Map> assignment = - LoadBalancer.retainAssignment(existing, listOfServerNames); + loadBalancer.retainAssignment(existing, listOfServerNames); assertRetainedAssignment(existing, listOfServerNames, assignment); // Include two new servers that were not there before - List servers2 = - new ArrayList(servers); + List servers2 = + new ArrayList(servers); servers2.add(randomServer(10)); servers2.add(randomServer(10)); listOfServerNames = getListOfServerNames(servers2); - assignment = LoadBalancer.retainAssignment(existing, listOfServerNames); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); assertRetainedAssignment(existing, listOfServerNames, assignment); // Remove two of the servers that were previously there - List servers3 = - new ArrayList(servers); + List servers3 = + new ArrayList(servers); servers3.remove(servers3.size()-1); servers3.remove(servers3.size()-2); listOfServerNames = getListOfServerNames(servers2); - assignment = LoadBalancer.retainAssignment(existing, listOfServerNames); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); assertRetainedAssignment(existing, listOfServerNames, assignment); } - private List getListOfServerNames(final List sals) { + private List getListOfServerNames(final List sals) { List list = new ArrayList(); - for (LoadBalancer.ServerAndLoad e: sals) { + for (ServerAndLoad e: sals) { list.add(e.getServerName()); } return list; @@ -380,10 +342,10 @@ public class TestLoadBalancer { } } - private String printStats(List servers) { + private String printStats(List servers) { int numServers = servers.size(); int totalRegions = 0; - for(LoadBalancer.ServerAndLoad server : servers) { + for(ServerAndLoad server : servers) { totalRegions += server.getLoad(); } float average = (float)totalRegions / numServers; @@ -392,11 +354,11 @@ public class TestLoadBalancer { return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]"; } - private List convertToList(final Map> servers) { - List list = - new ArrayList(servers.size()); + private List convertToList(final Map> servers) { + List list = + new ArrayList(servers.size()); for (Map.Entry> e: servers.entrySet()) { - list.add(new LoadBalancer.ServerAndLoad(e.getKey(), e.getValue().size())); + list.add(new ServerAndLoad(e.getKey(), e.getValue().size())); } return list; } @@ -405,11 +367,11 @@ public class TestLoadBalancer { return printMock(convertToList(servers)); } - private String printMock(List balancedCluster) { - SortedSet sorted = - new TreeSet(balancedCluster); - LoadBalancer.ServerAndLoad [] arr = - sorted.toArray(new LoadBalancer.ServerAndLoad[sorted.size()]); + private String printMock(List balancedCluster) { + SortedSet sorted = + new TreeSet(balancedCluster); + ServerAndLoad [] arr = + sorted.toArray(new ServerAndLoad[sorted.size()]); StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); sb.append("{ "); for(int i = 0; i < arr.length; i++) { @@ -425,17 +387,17 @@ public class TestLoadBalancer { /** * This assumes the RegionPlan HSI instances are the same ones in the map, so * actually no need to even pass in the map, but I think it's clearer. - * @param servers + * @param list * @param plans * @return */ - private List reconcile(List list, + private List reconcile(List list, List plans) { - List result = - new ArrayList(list.size()); + List result = + new ArrayList(list.size()); if (plans == null) return result; - Map map = - new HashMap(list.size()); + Map map = + new HashMap(list.size()); for (RegionPlan plan : plans) { ServerName source = plan.getSource(); updateLoad(map, source, -1); @@ -447,11 +409,11 @@ public class TestLoadBalancer { return result; } - private void updateLoad(Map map, + private void updateLoad(Map map, final ServerName sn, final int diff) { - LoadBalancer.ServerAndLoad sal = map.get(sn); + ServerAndLoad sal = map.get(sn); if (sal == null) return; - sal = new LoadBalancer.ServerAndLoad(sn, sal.getLoad() + diff); + sal = new ServerAndLoad(sn, sal.getLoad() + diff); map.put(sn, sal); } @@ -462,7 +424,7 @@ public class TestLoadBalancer { new TreeMap>(); for(int i = 0; i < numServers; i++) { int numRegions = mockCluster[i]; - LoadBalancer.ServerAndLoad sal = randomServer(0); + ServerAndLoad sal = randomServer(0); List regions = randomRegions(numRegions); servers.put(sal.getServerName(), regions); } @@ -499,21 +461,21 @@ public class TestLoadBalancer { private Queue serverQueue = new LinkedList(); - private LoadBalancer.ServerAndLoad randomServer(final int numRegionsPerServer) { + private ServerAndLoad randomServer(final int numRegionsPerServer) { if (!this.serverQueue.isEmpty()) { ServerName sn = this.serverQueue.poll(); - return new LoadBalancer.ServerAndLoad(sn, numRegionsPerServer); + return new ServerAndLoad(sn, numRegionsPerServer); } String host = "127.0.0.1"; int port = rand.nextInt(60000); long startCode = rand.nextLong(); ServerName sn = new ServerName(host, port, startCode); - return new LoadBalancer.ServerAndLoad(sn, numRegionsPerServer); + return new ServerAndLoad(sn, numRegionsPerServer); } - private List randomServers(int numServers, int numRegionsPerServer) { - List servers = - new ArrayList(numServers); + private List randomServers(int numServers, int numRegionsPerServer) { + List servers = + new ArrayList(numServers); for (int i = 0; i < numServers; i++) { servers.add(randomServer(numRegionsPerServer)); } diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index a2ff322e4c8..978092fff09 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes;