HBASE-3609 Improve the selection of regions to balance; part 2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1094679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
04ac5314bd
commit
ddb730cfbd
|
@ -172,6 +172,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3699 Make RegionServerServices and MasterServices extend Server
|
||||
(Erik Onnen)
|
||||
HBASE-3757 Upgrade to ZK 3.3.3
|
||||
HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu)
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -525,7 +525,7 @@
|
|||
<commons-lang.version>2.5</commons-lang.version>
|
||||
<commons-logging.version>1.1.1</commons-logging.version>
|
||||
<commons-math.version>2.1</commons-math.version>
|
||||
<guava.version>r06</guava.version>
|
||||
<guava.version>r09</guava.version>
|
||||
<!--The below was made by patching branch-0.20-append
|
||||
at revision 1034499 with this hdfs-895 patch:
|
||||
https://issues.apache.org/jira/secure/attachment/12459473/hdfs-895-branch-20-append.txt
|
||||
|
|
|
@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.master;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
|
@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
|
||||
/**
|
||||
* Makes decisions about the placement and movement of Regions across
|
||||
* RegionServers.
|
||||
|
@ -69,6 +73,25 @@ public class LoadBalancer {
|
|||
else if (slop > 1) slop = 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* The following comparator assumes that RegionId from HRegionInfo can
|
||||
* represent the age of the region - larger RegionId means the region
|
||||
* is younger.
|
||||
* This comparator is used in balanceCluster() to account for the out-of-band
|
||||
* regions which were assigned to the server after some other region server
|
||||
* crashed.
|
||||
*/
|
||||
static class RegionInfoComparator implements Comparator<HRegionInfo> {
|
||||
@Override
|
||||
public int compare(HRegionInfo l, HRegionInfo r) {
|
||||
long diff = r.getRegionId() - l.getRegionId();
|
||||
if (diff < 0) return -1;
|
||||
if (diff > 0) return 1;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
static RegionInfoComparator riComparator = new RegionInfoComparator();
|
||||
|
||||
static class RegionPlanComparator implements Comparator<RegionPlan> {
|
||||
@Override
|
||||
public int compare(RegionPlan l, RegionPlan r) {
|
||||
|
@ -89,6 +112,22 @@ public class LoadBalancer {
|
|||
* all servers will be balanced to the average. Otherwise, all servers will
|
||||
* have either floor(average) or ceiling(average) regions.
|
||||
*
|
||||
* HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
|
||||
* we can fetch from both ends of the queue.
|
||||
* At the beginning, we check whether there was empty region server
|
||||
* just discovered by Master. If so, we alternately choose new / old
|
||||
* regions from head / tail of regionsToMove, respectively. This alternation
|
||||
* avoids clustering young regions on the newly discovered region server.
|
||||
* Otherwise, we choose new regions from head of regionsToMove.
|
||||
*
|
||||
* Another improvement from HBASE-3609 is that we assign regions from
|
||||
* regionsToMove to underloaded servers in round-robin fashion.
|
||||
* Previously one underloaded server would be filled before we move onto
|
||||
* the next underloaded server, leading to clustering of young regions.
|
||||
*
|
||||
* Finally, we randomly shuffle underloaded servers so that they receive
|
||||
* offloaded regions relatively evenly across calls to balanceCluster().
|
||||
*
|
||||
* The algorithm is currently implemented as such:
|
||||
*
|
||||
* <ol>
|
||||
|
@ -110,7 +149,7 @@ public class LoadBalancer {
|
|||
* regions shed to fill each underloaded server to <b>MIN</b>. If so we
|
||||
* end up with a number of regions required to do so, <b>neededRegions</b>.
|
||||
*
|
||||
* It is also possible that we were able fill each underloaded but ended
|
||||
* It is also possible that we were able to fill each underloaded but ended
|
||||
* up with regions that were unassigned from overloaded servers but that
|
||||
* still do not have assignment.
|
||||
*
|
||||
|
@ -125,7 +164,6 @@ public class LoadBalancer {
|
|||
*
|
||||
* <li>We now definitely have more regions that need assignment, either from
|
||||
* the previous step or from the original shedding from overloaded servers.
|
||||
*
|
||||
* Iterate the least loaded servers filling each to <b>MIN</b>.
|
||||
*
|
||||
* <li>If we still have more regions that need assignment, again iterate the
|
||||
|
@ -152,6 +190,7 @@ public class LoadBalancer {
|
|||
*/
|
||||
public List<RegionPlan> balanceCluster(
|
||||
Map<HServerInfo,List<HRegionInfo>> clusterState) {
|
||||
boolean emptyRegionServerPresent = false;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// Make a map sorted by load and count regions
|
||||
|
@ -167,7 +206,9 @@ public class LoadBalancer {
|
|||
// Iterate so we can count regions as we build the map
|
||||
for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
|
||||
clusterState.entrySet()) {
|
||||
server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
|
||||
int sz = server.getValue().size();
|
||||
if (sz == 0) emptyRegionServerPresent = true;
|
||||
server.getKey().getLoad().setNumberOfRegions(sz);
|
||||
numRegions += server.getKey().getLoad().getNumberOfRegions();
|
||||
serversByLoad.put(server.getKey(), server.getValue());
|
||||
}
|
||||
|
@ -191,11 +232,13 @@ public class LoadBalancer {
|
|||
|
||||
// Balance the cluster
|
||||
// TODO: Look at data block locality or a more complex load to do this
|
||||
List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
|
||||
int regionidx = 0; // track the index in above list for setting destination
|
||||
MinMaxPriorityQueue<RegionPlan> regionsToMove = MinMaxPriorityQueue.orderedBy(rpComparator).create();
|
||||
List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
|
||||
|
||||
// Walk down most loaded, pruning each to the max
|
||||
int serversOverloaded = 0;
|
||||
// flag used to fetch regions from head and tail of list, alternately
|
||||
boolean fetchFromTail = false;
|
||||
Map<HServerInfo,BalanceInfo> serverBalanceInfo =
|
||||
new TreeMap<HServerInfo,BalanceInfo>();
|
||||
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
|
||||
|
@ -207,54 +250,98 @@ public class LoadBalancer {
|
|||
break;
|
||||
}
|
||||
serversOverloaded++;
|
||||
List<HRegionInfo> regions = randomize(server.getValue());
|
||||
List<HRegionInfo> regions = server.getValue();
|
||||
int numToOffload = Math.min(regionCount - max, regions.size());
|
||||
// account for the out-of-band regions which were assigned to this server
|
||||
// after some other region server crashed
|
||||
Collections.sort(regions, riComparator);
|
||||
int numTaken = 0;
|
||||
for (int i = regions.size() - 1; i >= 0; i--) {
|
||||
HRegionInfo hri = regions.get(i);
|
||||
for (int i = 0; i <= numToOffload; ) {
|
||||
HRegionInfo hri = regions.get(i); // fetch from head
|
||||
if (fetchFromTail) {
|
||||
hri = regions.get(regions.size() - 1 - i);
|
||||
}
|
||||
i++;
|
||||
// Don't rebalance meta regions.
|
||||
if (hri.isMetaRegion()) continue;
|
||||
regionsToMove.add(new RegionPlan(hri, serverInfo, null));
|
||||
numTaken++;
|
||||
if (numTaken >= numToOffload) break;
|
||||
// fetch in alternate order if there is new region server
|
||||
if (emptyRegionServerPresent) {
|
||||
fetchFromTail = !fetchFromTail;
|
||||
}
|
||||
}
|
||||
serverBalanceInfo.put(serverInfo,
|
||||
new BalanceInfo(numToOffload, (-1)*numTaken));
|
||||
}
|
||||
int totalNumMoved = regionsToMove.size();
|
||||
|
||||
// Walk down least loaded, filling each to the min
|
||||
int serversUnderloaded = 0; // number of servers that get new regions
|
||||
int neededRegions = 0; // number of regions needed to bring all up to min
|
||||
fetchFromTail = false;
|
||||
RegionPlan rp = null;
|
||||
Map<HServerInfo, Integer> underloadedServers = new HashMap<HServerInfo, Integer>();
|
||||
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
|
||||
serversByLoad.entrySet()) {
|
||||
int regionCount = server.getKey().getLoad().getNumberOfRegions();
|
||||
if(regionCount >= min) {
|
||||
break;
|
||||
}
|
||||
serversUnderloaded++;
|
||||
int numToTake = min - regionCount;
|
||||
int numTaken = 0;
|
||||
while(numTaken < numToTake && regionidx < regionsToMove.size()) {
|
||||
regionsToMove.get(regionidx).setDestination(server.getKey());
|
||||
numTaken++;
|
||||
regionidx++;
|
||||
underloadedServers.put(server.getKey(), min - regionCount);
|
||||
}
|
||||
serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
|
||||
// number of servers that get new regions
|
||||
int serversUnderloaded = underloadedServers.size();
|
||||
int incr = 1;
|
||||
List<HServerInfo> serverInfos = Arrays.asList(underloadedServers.keySet().
|
||||
toArray(new HServerInfo[serversUnderloaded]));
|
||||
Collections.shuffle(serverInfos, RANDOM);
|
||||
while (regionsToMove.size() > 0) {
|
||||
int cnt = 0;
|
||||
int i = incr > 0 ? 0 : underloadedServers.size()-1;
|
||||
for (; i >= 0 && i < underloadedServers.size(); i += incr) {
|
||||
if (0 == regionsToMove.size()) break;
|
||||
HServerInfo si = serverInfos.get(i);
|
||||
int numToTake = underloadedServers.get(si);
|
||||
if (numToTake == 0) continue;
|
||||
|
||||
if (!fetchFromTail) rp = regionsToMove.remove();
|
||||
else rp = regionsToMove.removeLast();
|
||||
rp.setDestination(si);
|
||||
regionsToReturn.add(rp);
|
||||
|
||||
if (emptyRegionServerPresent) {
|
||||
fetchFromTail = !fetchFromTail;
|
||||
}
|
||||
|
||||
underloadedServers.put(si, numToTake-1);
|
||||
cnt++;
|
||||
BalanceInfo bi = serverBalanceInfo.get(si);
|
||||
if (bi == null) {
|
||||
bi = new BalanceInfo(0, 0);
|
||||
serverBalanceInfo.put(si, bi);
|
||||
}
|
||||
bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
|
||||
}
|
||||
if (cnt == 0) break;
|
||||
// iterates underloadedServers in the other direction
|
||||
LOG.info("First pass inner loop assigned " + cnt + " regions");
|
||||
incr = -incr;
|
||||
}
|
||||
for (Integer i : underloadedServers.values()) {
|
||||
// If we still want to take some, increment needed
|
||||
if(numTaken < numToTake) {
|
||||
neededRegions += (numToTake - numTaken);
|
||||
}
|
||||
neededRegions += i;
|
||||
}
|
||||
|
||||
// If none needed to fill all to min and none left to drain all to max,
|
||||
// we are done
|
||||
if(neededRegions == 0 && regionidx == regionsToMove.size()) {
|
||||
if(neededRegions == 0 && 0 == regionsToMove.size()) {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
|
||||
"Moving " + regionsToMove.size() + " regions off of " +
|
||||
"Moving " + totalNumMoved + " regions off of " +
|
||||
serversOverloaded + " overloaded servers onto " +
|
||||
serversUnderloaded + " less loaded servers");
|
||||
return regionsToMove;
|
||||
return regionsToReturn;
|
||||
}
|
||||
|
||||
// Need to do a second pass.
|
||||
|
@ -272,6 +359,7 @@ public class LoadBalancer {
|
|||
HRegionInfo region = server.getValue().get(idx);
|
||||
if (region.isMetaRegion()) continue; // Don't move meta regions.
|
||||
regionsToMove.add(new RegionPlan(region, server.getKey(), null));
|
||||
totalNumMoved++;
|
||||
if(--neededRegions == 0) {
|
||||
// No more regions needed, done shedding
|
||||
break;
|
||||
|
@ -296,24 +384,35 @@ public class LoadBalancer {
|
|||
}
|
||||
int numToTake = min - regionCount;
|
||||
int numTaken = 0;
|
||||
while(numTaken < numToTake && regionidx < regionsToMove.size()) {
|
||||
regionsToMove.get(regionidx).setDestination(server.getKey());
|
||||
while(numTaken < numToTake && 0 < regionsToMove.size()) {
|
||||
if (!fetchFromTail) rp = regionsToMove.remove();
|
||||
else rp = regionsToMove.removeLast();
|
||||
rp.setDestination(server.getKey());
|
||||
regionsToReturn.add(rp);
|
||||
|
||||
numTaken++;
|
||||
regionidx++;
|
||||
if (emptyRegionServerPresent) {
|
||||
fetchFromTail = !fetchFromTail;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we still have regions to dish out, assign underloaded to max
|
||||
if(regionidx != regionsToMove.size()) {
|
||||
if(0 < regionsToMove.size()) {
|
||||
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
|
||||
serversByLoad.entrySet()) {
|
||||
int regionCount = server.getKey().getLoad().getNumberOfRegions();
|
||||
if(regionCount >= max) {
|
||||
break;
|
||||
}
|
||||
regionsToMove.get(regionidx).setDestination(server.getKey());
|
||||
regionidx++;
|
||||
if(regionidx == regionsToMove.size()) {
|
||||
if (!fetchFromTail) rp = regionsToMove.remove();
|
||||
else rp = regionsToMove.removeLast();
|
||||
rp.setDestination(server.getKey());
|
||||
regionsToReturn.add(rp);
|
||||
if (emptyRegionServerPresent) {
|
||||
fetchFromTail = !fetchFromTail;
|
||||
}
|
||||
if(0 == regionsToMove.size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -321,9 +420,9 @@ public class LoadBalancer {
|
|||
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
if (regionidx != regionsToMove.size() || neededRegions != 0) {
|
||||
if (0 != regionsToMove.size() || neededRegions != 0) {
|
||||
// Emit data so can diagnose how balancer went astray.
|
||||
LOG.warn("regionidx=" + regionidx + ", regionsToMove=" + regionsToMove.size() +
|
||||
LOG.warn("regionsToMove=" + totalNumMoved +
|
||||
", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
|
||||
", serversUnderloaded=" + serversUnderloaded);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -337,12 +436,12 @@ public class LoadBalancer {
|
|||
}
|
||||
|
||||
// All done!
|
||||
LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
|
||||
"Moving " + regionsToMove.size() + " regions off of " +
|
||||
LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
|
||||
"Moving " + totalNumMoved + " regions off of " +
|
||||
serversOverloaded + " overloaded servers onto " +
|
||||
serversUnderloaded + " less loaded servers");
|
||||
|
||||
return regionsToMove;
|
||||
return regionsToReturn;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -370,7 +469,7 @@ public class LoadBalancer {
|
|||
private static class BalanceInfo {
|
||||
|
||||
private final int nextRegionForUnload;
|
||||
private final int numRegionsAdded;
|
||||
private int numRegionsAdded;
|
||||
|
||||
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
|
||||
this.nextRegionForUnload = nextRegionForUnload;
|
||||
|
@ -384,6 +483,10 @@ public class LoadBalancer {
|
|||
public int getNumRegionsAdded() {
|
||||
return numRegionsAdded;
|
||||
}
|
||||
|
||||
public void setNumRegionsAdded(int numAdded) {
|
||||
this.numRegionsAdded = numAdded;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -35,8 +34,6 @@ import java.util.SortedSet;
|
|||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -141,38 +138,6 @@ public class TestLoadBalancer {
|
|||
new int [] { 12, 100 },
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testRandomizer() {
|
||||
for(int [] mockCluster : clusterStateMocks) {
|
||||
if (mockCluster.length < 5) continue;
|
||||
Map<HServerInfo, List<HRegionInfo>> servers =
|
||||
mockClusterServers(mockCluster);
|
||||
for (Map.Entry<HServerInfo, List<HRegionInfo>> e: servers.entrySet()) {
|
||||
List<HRegionInfo> original = e.getValue();
|
||||
if (original.size() < 5) continue;
|
||||
// Try ten times in case random chances upon original order more than
|
||||
// one or two times in a row.
|
||||
boolean same = true;
|
||||
for (int i = 0; i < 10 && same; i++) {
|
||||
List<HRegionInfo> copy = new ArrayList<HRegionInfo>(original);
|
||||
System.out.println("Randomizing before " + copy.size());
|
||||
for (HRegionInfo hri: copy) {
|
||||
System.out.println(hri.getEncodedName());
|
||||
}
|
||||
List<HRegionInfo> randomized = LoadBalancer.randomize(copy);
|
||||
System.out.println("Randomizing after " + randomized.size());
|
||||
for (HRegionInfo hri: randomized) {
|
||||
System.out.println(hri.getEncodedName());
|
||||
}
|
||||
if (original.equals(randomized)) continue;
|
||||
same = false;
|
||||
break;
|
||||
}
|
||||
assertFalse(same);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the load balancing algorithm.
|
||||
*
|
||||
|
@ -434,6 +399,7 @@ public class TestLoadBalancer {
|
|||
}
|
||||
|
||||
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
|
||||
static int regionId = 0;
|
||||
|
||||
private List<HRegionInfo> randomRegions(int numRegions) {
|
||||
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
|
||||
|
@ -449,7 +415,8 @@ public class TestLoadBalancer {
|
|||
Bytes.putInt(start, 0, numRegions << 1);
|
||||
Bytes.putInt(end, 0, (numRegions << 1) + 1);
|
||||
HRegionInfo hri = new HRegionInfo(
|
||||
new HTableDescriptor(Bytes.toBytes("table" + i)), start, end);
|
||||
new HTableDescriptor(Bytes.toBytes("table" + i)), start, end,
|
||||
false, regionId++);
|
||||
regions.add(hri);
|
||||
}
|
||||
return regions;
|
||||
|
|
Loading…
Reference in New Issue