HBASE-17515 Reduce memory footprint of RegionLoads kept by StochasticLoadBalancer (Tim Brown)
This commit is contained in:
parent
4254d7edf3
commit
a8bb27b2e0
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.balancer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.RegionLoad;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper class for the few fields required by the {@link StochasticLoadBalancer}
|
||||||
|
* from the full {@link RegionLoad}.
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
class BalancerRegionLoad {
|
||||||
|
private final long readRequestsCount;
|
||||||
|
private final long writeRequestsCount;
|
||||||
|
private final int memStoreSizeMB;
|
||||||
|
private final int storefileSizeMB;
|
||||||
|
|
||||||
|
BalancerRegionLoad(RegionLoad regionLoad) {
|
||||||
|
readRequestsCount = regionLoad.getReadRequestsCount();
|
||||||
|
writeRequestsCount = regionLoad.getWriteRequestsCount();
|
||||||
|
memStoreSizeMB = regionLoad.getMemStoreSizeMB();
|
||||||
|
storefileSizeMB = regionLoad.getStorefileSizeMB();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getReadRequestsCount() {
|
||||||
|
return readRequestsCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getWriteRequestsCount() {
|
||||||
|
return writeRequestsCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMemStoreSizeMB() {
|
||||||
|
return memStoreSizeMB;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStorefileSizeMB() {
|
||||||
|
return storefileSizeMB;
|
||||||
|
}
|
||||||
|
}
|
|
@ -118,7 +118,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
ArrayList<String> tables;
|
ArrayList<String> tables;
|
||||||
HRegionInfo[] regions;
|
HRegionInfo[] regions;
|
||||||
Deque<RegionLoad>[] regionLoads;
|
Deque<BalancerRegionLoad>[] regionLoads;
|
||||||
private RegionLocationFinder regionFinder;
|
private RegionLocationFinder regionFinder;
|
||||||
|
|
||||||
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
|
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
|
||||||
|
@ -166,7 +166,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
protected Cluster(
|
protected Cluster(
|
||||||
Map<ServerName, List<HRegionInfo>> clusterState,
|
Map<ServerName, List<HRegionInfo>> clusterState,
|
||||||
Map<String, Deque<RegionLoad>> loads,
|
Map<String, Deque<BalancerRegionLoad>> loads,
|
||||||
RegionLocationFinder regionFinder,
|
RegionLocationFinder regionFinder,
|
||||||
RackManager rackManager) {
|
RackManager rackManager) {
|
||||||
this(null, clusterState, loads, regionFinder, rackManager);
|
this(null, clusterState, loads, regionFinder, rackManager);
|
||||||
|
@ -176,7 +176,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
protected Cluster(
|
protected Cluster(
|
||||||
Collection<HRegionInfo> unassignedRegions,
|
Collection<HRegionInfo> unassignedRegions,
|
||||||
Map<ServerName, List<HRegionInfo>> clusterState,
|
Map<ServerName, List<HRegionInfo>> clusterState,
|
||||||
Map<String, Deque<RegionLoad>> loads,
|
Map<String, Deque<BalancerRegionLoad>> loads,
|
||||||
RegionLocationFinder regionFinder,
|
RegionLocationFinder regionFinder,
|
||||||
RackManager rackManager) {
|
RackManager rackManager) {
|
||||||
|
|
||||||
|
@ -431,7 +431,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
/** Helper for Cluster constructor to handle a region */
|
/** Helper for Cluster constructor to handle a region */
|
||||||
private void registerRegion(HRegionInfo region, int regionIndex,
|
private void registerRegion(HRegionInfo region, int regionIndex,
|
||||||
int serverIndex, Map<String, Deque<RegionLoad>> loads,
|
int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
|
||||||
RegionLocationFinder regionFinder) {
|
RegionLocationFinder regionFinder) {
|
||||||
String tableName = region.getTable().getNameAsString();
|
String tableName = region.getTable().getNameAsString();
|
||||||
if (!tablesToIndex.containsKey(tableName)) {
|
if (!tablesToIndex.containsKey(tableName)) {
|
||||||
|
@ -448,7 +448,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
// region load
|
// region load
|
||||||
if (loads != null) {
|
if (loads != null) {
|
||||||
Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
|
Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
|
||||||
// That could have failed if the RegionLoad is using the other regionName
|
// That could have failed if the RegionLoad is using the other regionName
|
||||||
if (rl == null) {
|
if (rl == null) {
|
||||||
// Try getting the region load using encoded name.
|
// Try getting the region load using encoded name.
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||||
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
||||||
|
|
||||||
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
|
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<String, Deque<BalancerRegionLoad>>();
|
||||||
|
|
||||||
// values are defaults
|
// values are defaults
|
||||||
private int maxSteps = 1000000;
|
private int maxSteps = 1000000;
|
||||||
|
@ -498,8 +498,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
private synchronized void updateRegionLoad() {
|
private synchronized void updateRegionLoad() {
|
||||||
// We create a new hashmap so that regions that are no longer there are removed.
|
// We create a new hashmap so that regions that are no longer there are removed.
|
||||||
// However we temporarily need the old loads so we can use them to keep the rolling average.
|
// However we temporarily need the old loads so we can use them to keep the rolling average.
|
||||||
Map<String, Deque<RegionLoad>> oldLoads = loads;
|
Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
|
||||||
loads = new HashMap<String, Deque<RegionLoad>>();
|
loads = new HashMap<String, Deque<BalancerRegionLoad>>();
|
||||||
|
|
||||||
for (ServerName sn : clusterStatus.getServers()) {
|
for (ServerName sn : clusterStatus.getServers()) {
|
||||||
ServerLoad sl = clusterStatus.getLoad(sn);
|
ServerLoad sl = clusterStatus.getLoad(sn);
|
||||||
|
@ -507,16 +507,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
|
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
|
||||||
Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
|
Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
|
||||||
if (rLoads == null) {
|
if (rLoads == null) {
|
||||||
// There was nothing there
|
// There was nothing there
|
||||||
rLoads = new ArrayDeque<RegionLoad>();
|
rLoads = new ArrayDeque<BalancerRegionLoad>();
|
||||||
} else if (rLoads.size() >= numRegionLoadsToRemember) {
|
} else if (rLoads.size() >= numRegionLoadsToRemember) {
|
||||||
rLoads.remove();
|
rLoads.remove();
|
||||||
}
|
}
|
||||||
rLoads.add(entry.getValue());
|
rLoads.add(new BalancerRegionLoad(entry.getValue()));
|
||||||
loads.put(Bytes.toString(entry.getKey()), rLoads);
|
loads.put(Bytes.toString(entry.getKey()), rLoads);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1251,7 +1250,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
abstract static class CostFromRegionLoadFunction extends CostFunction {
|
abstract static class CostFromRegionLoadFunction extends CostFunction {
|
||||||
|
|
||||||
private ClusterStatus clusterStatus = null;
|
private ClusterStatus clusterStatus = null;
|
||||||
private Map<String, Deque<RegionLoad>> loads = null;
|
private Map<String, Deque<BalancerRegionLoad>> loads = null;
|
||||||
private double[] stats = null;
|
private double[] stats = null;
|
||||||
CostFromRegionLoadFunction(Configuration conf) {
|
CostFromRegionLoadFunction(Configuration conf) {
|
||||||
super(conf);
|
super(conf);
|
||||||
|
@ -1261,7 +1260,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
this.clusterStatus = status;
|
this.clusterStatus = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setLoads(Map<String, Deque<RegionLoad>> l) {
|
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
|
||||||
this.loads = l;
|
this.loads = l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1281,7 +1280,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
|
|
||||||
// for every region on this server get the rl
|
// for every region on this server get the rl
|
||||||
for(int regionIndex:cluster.regionsPerServer[i]) {
|
for(int regionIndex:cluster.regionsPerServer[i]) {
|
||||||
Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||||
|
|
||||||
// Now if we found a region load get the type of cost that was requested.
|
// Now if we found a region load get the type of cost that was requested.
|
||||||
if (regionLoadList != null) {
|
if (regionLoadList != null) {
|
||||||
|
@ -1297,15 +1296,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
return costFromArray(stats);
|
return costFromArray(stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
|
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||||
double cost = 0;
|
double cost = 0;
|
||||||
for (RegionLoad rl : regionLoadList) {
|
for (BalancerRegionLoad rl : regionLoadList) {
|
||||||
cost += getCostFromRl(rl);
|
cost += getCostFromRl(rl);
|
||||||
}
|
}
|
||||||
return cost / regionLoadList.size();
|
return cost / regionLoadList.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract double getCostFromRl(RegionLoad rl);
|
protected abstract double getCostFromRl(BalancerRegionLoad rl);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1320,11 +1319,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
|
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||||
double cost = 0;
|
double cost = 0;
|
||||||
double previous = 0;
|
double previous = 0;
|
||||||
boolean isFirst = true;
|
boolean isFirst = true;
|
||||||
for (RegionLoad rl : regionLoadList) {
|
for (BalancerRegionLoad rl : regionLoadList) {
|
||||||
double current = getCostFromRl(rl);
|
double current = getCostFromRl(rl);
|
||||||
if (isFirst) {
|
if (isFirst) {
|
||||||
isFirst = false;
|
isFirst = false;
|
||||||
|
@ -1354,7 +1353,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double getCostFromRl(RegionLoad rl) {
|
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||||
return rl.getReadRequestsCount();
|
return rl.getReadRequestsCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1375,7 +1374,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double getCostFromRl(RegionLoad rl) {
|
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||||
return rl.getWriteRequestsCount();
|
return rl.getWriteRequestsCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1553,7 +1552,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double getCostFromRl(RegionLoad rl) {
|
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||||
return rl.getMemStoreSizeMB();
|
return rl.getMemStoreSizeMB();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1573,7 +1572,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double getCostFromRl(RegionLoad rl) {
|
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||||
return rl.getStorefileSizeMB();
|
return rl.getStorefileSizeMB();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
ServerLoad sl = mock(ServerLoad.class);
|
ServerLoad sl = mock(ServerLoad.class);
|
||||||
|
|
||||||
RegionLoad rl = mock(RegionLoad.class);
|
RegionLoad rl = mock(RegionLoad.class);
|
||||||
when(rl.getStores()).thenReturn(i);
|
when(rl.getStorefileSizeMB()).thenReturn(i);
|
||||||
|
|
||||||
Map<byte[], RegionLoad> regionLoadMap =
|
Map<byte[], RegionLoad> regionLoadMap =
|
||||||
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -85,11 +85,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
assertTrue(loadBalancer.loads.get(REGION_KEY) != null);
|
assertTrue(loadBalancer.loads.get(REGION_KEY) != null);
|
||||||
assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15);
|
assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15);
|
||||||
|
|
||||||
Queue<RegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
|
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while(loads.size() > 0) {
|
while(loads.size() > 0) {
|
||||||
RegionLoad rl = loads.remove();
|
BalancerRegionLoad rl = loads.remove();
|
||||||
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStores());
|
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStorefileSizeMB());
|
||||||
i ++;
|
i ++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,9 +232,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionLoadCost() {
|
public void testRegionLoadCost() {
|
||||||
List<RegionLoad> regionLoads = new ArrayList<>();
|
List<BalancerRegionLoad> regionLoads = new ArrayList<>();
|
||||||
for (int i = 1; i < 5; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
RegionLoad regionLoad = mock(RegionLoad.class);
|
BalancerRegionLoad regionLoad = mock(BalancerRegionLoad.class);
|
||||||
when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i));
|
when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i));
|
||||||
when(regionLoad.getStorefileSizeMB()).thenReturn(i);
|
when(regionLoad.getStorefileSizeMB()).thenReturn(i);
|
||||||
regionLoads.add(regionLoad);
|
regionLoads.add(regionLoad);
|
||||||
|
|
Loading…
Reference in New Issue