HBASE-9267 Change region load in load balancer to use circular array.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1516335 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
eclark 2013-08-21 23:43:33 +00:00
parent 46539d2d97
commit 9cb59f5279
3 changed files with 73 additions and 33 deletions

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -63,7 +65,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
ServerName[] servers;
ArrayList<String> tables;
HRegionInfo[] regions;
List<RegionLoad>[] regionLoads;
Deque<RegionLoad>[] regionLoads;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
int[][] regionsPerServer; //serverIndex -> region list
@ -85,7 +87,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numMovedRegions = 0; //num moved regions from the initial configuration
int numMovedMetaRegions = 0; //num of moved regions that are META
protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, List<RegionLoad>> loads,
protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder) {
serversToIndex = new HashMap<String, Integer>();
@ -121,7 +123,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToServerIndex = new int[numRegions];
initialRegionIndexToServerIndex = new int[numRegions];
regionIndexToTableIndex = new int[numRegions];
regionLoads = new List[numRegions];
regionLoads = new Deque[numRegions];
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
@ -162,7 +164,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// region load
if (loads != null) {
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
if (rl == null) {
// Try getting the region load using encoded name.

View File

@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -101,7 +103,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private final RegionLocationFinder regionFinder = new RegionLocationFinder();
private ClusterStatus clusterStatus = null;
private Map<String, List<RegionLoad>> loads = new HashMap<String, List<RegionLoad>>();
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
// values are defaults
private int maxSteps = 1000000;
@ -183,9 +185,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
*/
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
//if (!needsBalance(new ClusterLoadState(clusterState))) {
// return null;
//}
if (!needsBalance(new ClusterLoadState(clusterState))) {
return null;
}
long startTime = EnvironmentEdgeManager.currentTimeMillis();
@ -303,8 +305,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private synchronized void updateRegionLoad() {
// We create a new hashmap so that regions that are no longer there are removed.
// However we temporarily need the old loads so we can use them to keep the rolling average.
Map<String, List<RegionLoad>> oldLoads = loads;
loads = new HashMap<String, List<RegionLoad>>();
Map<String, Deque<RegionLoad>> oldLoads = loads;
loads = new HashMap<String, Deque<RegionLoad>>();
for (ServerName sn : clusterStatus.getServers()) {
ServerLoad sl = clusterStatus.getLoad(sn);
@ -312,17 +314,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
continue;
}
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
List<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
if (rLoads != null) {
// We're only going to keep 15. So if there are that many already take the last 14
if (rLoads.size() >= numRegionLoadsToRemember) {
int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
rLoads = rLoads.subList(numToRemove, rLoads.size());
}
} else {
Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
if (rLoads == null) {
// There was nothing there
rLoads = new ArrayList<RegionLoad>();
rLoads = new ArrayDeque<RegionLoad>();
} else if (rLoads.size() >= 15) {
rLoads.remove();
}
rLoads.add(entry.getValue());
loads.put(Bytes.toString(entry.getKey()), rLoads);
@ -806,7 +803,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
public abstract static class CostFromRegionLoadFunction extends CostFunction {
private ClusterStatus clusterStatus = null;
private Map<String, List<RegionLoad>> loads = null;
private Map<String, Deque<RegionLoad>> loads = null;
private double[] stats = null;
CostFromRegionLoadFunction(Configuration conf) {
super(conf);
@ -816,7 +813,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.clusterStatus = status;
}
void setLoads(Map<String, List<RegionLoad>> l) {
void setLoads(Map<String, Deque<RegionLoad>> l) {
this.loads = l;
}
@ -836,7 +833,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// for every region on this server get the rl
for(int regionIndex:cluster.regionsPerServer[i]) {
List<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
@ -852,7 +849,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return costFromArray(stats);
}
protected double getRegionLoadCost(List<RegionLoad> regionLoadList) {
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
double cost = 0;
for (RegionLoad rl : regionLoadList) {

View File

@ -17,32 +17,39 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Category(MediumTests.class)
public class TestStochasticLoadBalancer extends BalancerTestBase {
public static final String REGION_KEY = "testRegion";
private static StochasticLoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
@ -111,6 +118,40 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
new int[]{130, 140, 60, 100, 100, 100, 80, 100}
};
@Test
public void testKeepRegionLoad() throws Exception {
ServerName sn = new ServerName("test:8080", 100);
int numClusterStatusToAdd = 20000;
for (int i = 0; i < numClusterStatusToAdd; i++) {
ServerLoad sl = mock(ServerLoad.class);
RegionLoad rl = mock(RegionLoad.class);
when(rl.getStores()).thenReturn(i);
Map<byte[], RegionLoad> regionLoadMap =
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl);
when(sl.getRegionsLoad()).thenReturn(regionLoadMap);
ClusterStatus clusterStatus = mock(ClusterStatus.class);
when(clusterStatus.getServers()).thenReturn(Arrays.asList(sn));
when(clusterStatus.getLoad(sn)).thenReturn(sl);
loadBalancer.setClusterStatus(clusterStatus);
}
assertTrue(loadBalancer.loads.get(REGION_KEY) != null);
assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15);
Queue<RegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
int i = 0;
while(loads.size() > 0) {
RegionLoad rl = loads.remove();
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStores());
i ++;
}
}
/**
* Test the load balancing algorithm.
*