HBASE-17515 Reduce memory footprint of RegionLoads kept by StochasticLoadBalancer (Tim Brown)
This commit is contained in:
parent
ed023058d2
commit
a0fac0894b
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Wrapper class for the few fields required by the {@link StochasticLoadBalancer}
|
||||
* from the full {@link RegionLoad}.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
class BalancerRegionLoad {
|
||||
private final long readRequestsCount;
|
||||
private final long writeRequestsCount;
|
||||
private final int memStoreSizeMB;
|
||||
private final int storefileSizeMB;
|
||||
|
||||
BalancerRegionLoad(RegionLoad regionLoad) {
|
||||
readRequestsCount = regionLoad.getReadRequestsCount();
|
||||
writeRequestsCount = regionLoad.getWriteRequestsCount();
|
||||
memStoreSizeMB = regionLoad.getMemStoreSizeMB();
|
||||
storefileSizeMB = regionLoad.getStorefileSizeMB();
|
||||
}
|
||||
|
||||
public long getReadRequestsCount() {
|
||||
return readRequestsCount;
|
||||
}
|
||||
|
||||
public long getWriteRequestsCount() {
|
||||
return writeRequestsCount;
|
||||
}
|
||||
|
||||
public int getMemStoreSizeMB() {
|
||||
return memStoreSizeMB;
|
||||
}
|
||||
|
||||
public int getStorefileSizeMB() {
|
||||
return storefileSizeMB;
|
||||
}
|
||||
}
|
|
@ -113,7 +113,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
ArrayList<String> tables;
|
||||
HRegionInfo[] regions;
|
||||
Deque<RegionLoad>[] regionLoads;
|
||||
Deque<BalancerRegionLoad>[] regionLoads;
|
||||
private RegionLocationFinder regionFinder;
|
||||
|
||||
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
|
||||
|
@ -161,7 +161,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
protected Cluster(
|
||||
Map<ServerName, List<HRegionInfo>> clusterState,
|
||||
Map<String, Deque<RegionLoad>> loads,
|
||||
Map<String, Deque<BalancerRegionLoad>> loads,
|
||||
RegionLocationFinder regionFinder,
|
||||
RackManager rackManager) {
|
||||
this(null, clusterState, loads, regionFinder, rackManager);
|
||||
|
@ -171,7 +171,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
protected Cluster(
|
||||
Collection<HRegionInfo> unassignedRegions,
|
||||
Map<ServerName, List<HRegionInfo>> clusterState,
|
||||
Map<String, Deque<RegionLoad>> loads,
|
||||
Map<String, Deque<BalancerRegionLoad>> loads,
|
||||
RegionLocationFinder regionFinder,
|
||||
RackManager rackManager) {
|
||||
|
||||
|
@ -426,7 +426,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
/** Helper for Cluster constructor to handle a region */
|
||||
private void registerRegion(HRegionInfo region, int regionIndex,
|
||||
int serverIndex, Map<String, Deque<RegionLoad>> loads,
|
||||
int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
|
||||
RegionLocationFinder regionFinder) {
|
||||
String tableName = region.getTable().getNameAsString();
|
||||
if (!tablesToIndex.containsKey(tableName)) {
|
||||
|
@ -443,7 +443,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
// region load
|
||||
if (loads != null) {
|
||||
Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
|
||||
Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
|
||||
// That could have failed if the RegionLoad is using the other regionName
|
||||
if (rl == null) {
|
||||
// Try getting the region load using encoded name.
|
||||
|
|
|
@ -113,7 +113,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
||||
|
||||
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
|
||||
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<String, Deque<BalancerRegionLoad>>();
|
||||
|
||||
// values are defaults
|
||||
private int maxSteps = 1000000;
|
||||
|
@ -498,8 +498,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private synchronized void updateRegionLoad() {
|
||||
// We create a new hashmap so that regions that are no longer there are removed.
|
||||
// However we temporarily need the old loads so we can use them to keep the rolling average.
|
||||
Map<String, Deque<RegionLoad>> oldLoads = loads;
|
||||
loads = new HashMap<String, Deque<RegionLoad>>();
|
||||
Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
|
||||
loads = new HashMap<String, Deque<BalancerRegionLoad>>();
|
||||
|
||||
for (ServerName sn : clusterStatus.getServers()) {
|
||||
ServerLoad sl = clusterStatus.getLoad(sn);
|
||||
|
@ -507,16 +507,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
continue;
|
||||
}
|
||||
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
|
||||
Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
|
||||
Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
|
||||
if (rLoads == null) {
|
||||
// There was nothing there
|
||||
rLoads = new ArrayDeque<RegionLoad>();
|
||||
rLoads = new ArrayDeque<BalancerRegionLoad>();
|
||||
} else if (rLoads.size() >= numRegionLoadsToRemember) {
|
||||
rLoads.remove();
|
||||
}
|
||||
rLoads.add(entry.getValue());
|
||||
rLoads.add(new BalancerRegionLoad(entry.getValue()));
|
||||
loads.put(Bytes.toString(entry.getKey()), rLoads);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1251,7 +1250,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
abstract static class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
private ClusterStatus clusterStatus = null;
|
||||
private Map<String, Deque<RegionLoad>> loads = null;
|
||||
private Map<String, Deque<BalancerRegionLoad>> loads = null;
|
||||
private double[] stats = null;
|
||||
CostFromRegionLoadFunction(Configuration conf) {
|
||||
super(conf);
|
||||
|
@ -1261,7 +1260,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
this.clusterStatus = status;
|
||||
}
|
||||
|
||||
void setLoads(Map<String, Deque<RegionLoad>> l) {
|
||||
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
|
||||
this.loads = l;
|
||||
}
|
||||
|
||||
|
@ -1281,7 +1280,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
// for every region on this server get the rl
|
||||
for(int regionIndex:cluster.regionsPerServer[i]) {
|
||||
Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
|
@ -1297,17 +1296,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return costFromArray(stats);
|
||||
}
|
||||
|
||||
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
|
||||
for (RegionLoad rl : regionLoadList) {
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
cost += getCostFromRl(rl);
|
||||
}
|
||||
return cost / regionLoadList.size();
|
||||
}
|
||||
|
||||
|
||||
protected abstract double getCostFromRl(RegionLoad rl);
|
||||
protected abstract double getCostFromRl(BalancerRegionLoad rl);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1322,11 +1321,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
double previous = 0;
|
||||
boolean isFirst = true;
|
||||
for (RegionLoad rl : regionLoadList) {
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
double current = getCostFromRl(rl);
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
|
@ -1357,7 +1356,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(RegionLoad rl) {
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getReadRequestsCount();
|
||||
}
|
||||
}
|
||||
|
@ -1378,7 +1377,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(RegionLoad rl) {
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getWriteRequestsCount();
|
||||
}
|
||||
}
|
||||
|
@ -1556,7 +1555,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(RegionLoad rl) {
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getMemStoreSizeMB();
|
||||
}
|
||||
}
|
||||
|
@ -1576,7 +1575,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(RegionLoad rl) {
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getStorefileSizeMB();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
ServerLoad sl = mock(ServerLoad.class);
|
||||
|
||||
RegionLoad rl = mock(RegionLoad.class);
|
||||
when(rl.getStores()).thenReturn(i);
|
||||
when(rl.getStorefileSizeMB()).thenReturn(i);
|
||||
|
||||
Map<byte[], RegionLoad> regionLoadMap =
|
||||
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -85,11 +85,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
assertTrue(loadBalancer.loads.get(REGION_KEY) != null);
|
||||
assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15);
|
||||
|
||||
Queue<RegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
|
||||
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
|
||||
int i = 0;
|
||||
while(loads.size() > 0) {
|
||||
RegionLoad rl = loads.remove();
|
||||
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStores());
|
||||
BalancerRegionLoad rl = loads.remove();
|
||||
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStorefileSizeMB());
|
||||
i ++;
|
||||
}
|
||||
}
|
||||
|
@ -232,9 +232,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
|
||||
@Test
|
||||
public void testRegionLoadCost() {
|
||||
List<RegionLoad> regionLoads = new ArrayList<>();
|
||||
List<BalancerRegionLoad> regionLoads = new ArrayList<>();
|
||||
for (int i = 1; i < 5; i++) {
|
||||
RegionLoad regionLoad = mock(RegionLoad.class);
|
||||
BalancerRegionLoad regionLoad = mock(BalancerRegionLoad.class);
|
||||
when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i));
|
||||
when(regionLoad.getStorefileSizeMB()).thenReturn(i);
|
||||
regionLoads.add(regionLoad);
|
||||
|
|
Loading…
Reference in New Issue