HBASE-5959 Add other load balancers

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-30 20:53:15 +00:00
parent 65fc2953bd
commit 4ebecd3eba
20 changed files with 2169 additions and 931 deletions

View File

@ -337,6 +337,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
@ -1135,6 +1136,8 @@ Server {
this.assignmentManager.getAssignmentsByTable();
List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
if (partialPlans != null) plans.addAll(partialPlans);

View File

@ -19,15 +19,15 @@
*/
package org.apache.hadoop.hbase.master;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import java.util.List;
import java.util.Map;
/**
* Makes decisions about the placement and movement of Regions across
* RegionServers.

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@ -37,6 +40,19 @@ public class RegionPlan implements Comparable<RegionPlan> {
private final ServerName source;
private ServerName dest;
public static class RegionPlanComparator implements Comparator<RegionPlan>, Serializable {
private static final long serialVersionUID = 4213207330485734853L;
@Override
public int compare(RegionPlan l, RegionPlan r) {
long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
if (diff < 0) return -1;
if (diff > 0) return 1;
return 0;
}
}
/**
* Instantiate a plan for a region move, moving the specified region from
* the specified source server to the specified destination server.

View File

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;
/**
* The base class for load balancers. It provides the the functions used to by
* {@link AssignmentManager} to assign regions in the edge cases. It doesn't
* provide an implementation of the actual balancing algorithm.
*
*/
public abstract class BaseLoadBalancer implements LoadBalancer {
// slop for regions
private float slop;
private Configuration config;
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
protected MasterServices services;
@Override
public void setConf(Configuration conf) {
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
if (slop < 0) slop = 0;
else if (slop > 1) slop = 1;
this.config = conf;
}
@Override
public Configuration getConf() {
return this.config;
}
public void setClusterStatus(ClusterStatus st) {
// Not used except for the StocasticBalancer
}
public void setMasterServices(MasterServices masterServices) {
this.services = masterServices;
}
protected boolean needsBalance(ClusterLoadState cs) {
// Check if we even need to do any load balancing
float average = cs.getLoadAverage(); // for logging
// HBASE-3681 check sloppiness first
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
return cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor;
}
/**
* Generates a bulk assignment plan to be used on cluster startup using a
* simple round-robin assignment.
* <p>
* Takes a list of all the regions and all the servers in the cluster and
* returns a map of each server to the regions that it should be assigned.
* <p>
* Currently implemented as a round-robin assignment. Same invariant as load
* balancing, all servers holding floor(avg) or ceiling(avg).
*
* TODO: Use block locations from HDFS to place regions with their blocks
*
* @param regions all regions
* @param servers all servers
* @return map of server to the regions it should take, or null if no
* assignment is possible (ie. no regions or no servers)
*/
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
if (regions.isEmpty() || servers.isEmpty()) {
return null;
}
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
int numRegions = regions.size();
int numServers = servers.size();
int max = (int) Math.ceil((float) numRegions / numServers);
int serverIdx = 0;
if (numServers > 1) {
serverIdx = RANDOM.nextInt(numServers);
}
int regionIdx = 0;
for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
for (int i = regionIdx; i < numRegions; i += numServers) {
serverRegions.add(regions.get(i % numRegions));
}
assignments.put(server, serverRegions);
regionIdx++;
}
return assignments;
}
/**
* Generates an immediate assignment plan to be used by a new master for
* regions in transition that do not have an already known destination.
*
* Takes a list of regions that need immediate assignment and a list of all
* available servers. Returns a map of regions to the server they should be
* assigned to.
*
* This method will return quickly and does not do any intelligent balancing.
* The goal is to make a fast decision not the best decision possible.
*
* Currently this is random.
*
* @param regions
* @param servers
* @return map of regions to the server it should be assigned to
*/
public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
for (HRegionInfo region : regions) {
assignments.put(region, randomAssignment(region, servers));
}
return assignments;
}
/**
* Used to assign a single region to a random server.
*/
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
if (servers == null || servers.isEmpty()) {
LOG.warn("Wanted to do random assignment but no servers to assign to");
return null;
}
return servers.get(RANDOM.nextInt(servers.size()));
}
/**
* Generates a bulk assignment startup plan, attempting to reuse the existing
* assignment information from META, but adjusting for the specified list of
* available/online servers available for assignment.
* <p>
* Takes a map of all regions to their existing assignment from META. Also
* takes a list of online servers for regions to be assigned to. Attempts to
* retain all assignment, so in some instances initial assignment will not be
* completely balanced.
* <p>
* Any leftover regions without an existing server to be assigned to will be
* assigned randomly to available servers.
*
* @param regions regions and existing assignment from meta
* @param servers available servers
* @return map of servers and regions to be assigned to them
*/
public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
List<ServerName> servers) {
// Group all of the old assignments by their hostname.
// We can't group directly by ServerName since the servers all have
// new start-codes.
// Group the servers by their hostname. It's possible we have multiple
// servers on the same host on different ports.
ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
for (ServerName server : servers) {
serversByHostname.put(server.getHostname(), server);
}
// Now come up with new assignments
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
for (ServerName server : servers) {
assignments.put(server, new ArrayList<HRegionInfo>());
}
// Collection of the hostnames that used to have regions
// assigned, but for which we no longer have any RS running
// after the cluster restart.
Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
List<ServerName> localServers = new ArrayList<ServerName>();
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostname());
}
if (localServers.isEmpty()) {
// No servers on the new cluster match up with this hostname,
// assign randomly.
ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
assignments.get(randomServer).add(region);
numRandomAssignments++;
if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
} else if (localServers.size() == 1) {
// the usual case - one new server on same host
assignments.get(localServers.get(0)).add(region);
numRetainedAssigments++;
} else {
// multiple new servers in the cluster on this same host
int size = localServers.size();
ServerName target = localServers.get(RANDOM.nextInt(size));
assignments.get(target).add(region);
numRetainedAssigments++;
}
}
String randomAssignMsg = "";
if (numRandomAssignments > 0) {
randomAssignMsg =
numRandomAssignments + " regions were assigned "
+ "to random hosts, since the old hosts for these regions are no "
+ "longer present in the cluster. These hosts were:\n "
+ Joiner.on("\n ").join(oldHostsNoLongerPresent);
}
LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
+ " retained the pre-restart assignment. " + randomAssignMsg);
return assignments;
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
/**
* Class used to hold the current state of the cluster and how balanced it is.
*/
public class ClusterLoadState {
private final Map<ServerName, List<HRegionInfo>> clusterState;
private final NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad;
private boolean emptyRegionServerPresent = false;
private int numRegions = 0;
private int numServers = 0;
public ClusterLoadState(Map<ServerName, List<HRegionInfo>> clusterState) {
super();
this.numRegions = 0;
this.numServers = clusterState.size();
this.clusterState = clusterState;
serversByLoad = new TreeMap<ServerAndLoad, List<HRegionInfo>>();
// Iterate so we can count regions as we build the map
for (Map.Entry<ServerName, List<HRegionInfo>> server : clusterState.entrySet()) {
List<HRegionInfo> regions = server.getValue();
int sz = regions.size();
if (sz == 0) emptyRegionServerPresent = true;
numRegions += sz;
serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
}
}
Map<ServerName, List<HRegionInfo>> getClusterState() {
return clusterState;
}
NavigableMap<ServerAndLoad, List<HRegionInfo>> getServersByLoad() {
return serversByLoad;
}
boolean isEmptyRegionServerPresent() {
return emptyRegionServerPresent;
}
int getNumRegions() {
return numRegions;
}
int getNumServers() {
return numServers;
}
float getLoadAverage() {
return (float) numRegions / numServers;
}
int getMinLoad() {
return getServersByLoad().lastKey().getLoad();
}
int getMaxLoad() {
return getServersByLoad().firstKey().getLoad();
}
}

View File

@ -1,6 +1,4 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,41 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.balancer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
/**
* Makes decisions about the placement and movement of Regions across
@ -70,69 +54,45 @@ import com.google.common.collect.Sets;
* <p>This classes produces plans for the {@link AssignmentManager} to execute.
*/
@InterfaceAudience.Private
public class DefaultLoadBalancer implements LoadBalancer {
private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
public class DefaultLoadBalancer extends BaseLoadBalancer {
private static final Log LOG = LogFactory.getLog(DefaultLoadBalancer.class);
private static final Random RANDOM = new Random(System.currentTimeMillis());
// slop for regions
private float slop;
private Configuration config;
private ClusterStatus status;
private MasterServices services;
public void setClusterStatus(ClusterStatus st) {
this.status = st;
}
public void setMasterServices(MasterServices masterServices) {
this.services = masterServices;
}
@Override
public void setConf(Configuration conf) {
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
if (slop < 0) slop = 0;
else if (slop > 1) slop = 1;
this.config = conf;
}
@Override
public Configuration getConf() {
return this.config;
}
/*
* The following comparator assumes that RegionId from HRegionInfo can
* represent the age of the region - larger RegionId means the region
* is younger.
* This comparator is used in balanceCluster() to account for the out-of-band
* regions which were assigned to the server after some other region server
* crashed.
*/
private static class RegionInfoComparator implements Comparator<HRegionInfo> {
@Override
public int compare(HRegionInfo l, HRegionInfo r) {
long diff = r.getRegionId() - l.getRegionId();
if (diff < 0) return -1;
if (diff > 0) return 1;
return 0;
}
}
private RegionInfoComparator riComparator = new RegionInfoComparator();
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
RegionInfoComparator riComparator = new RegionInfoComparator();
private static class RegionPlanComparator implements Comparator<RegionPlan> {
@Override
public int compare(RegionPlan l, RegionPlan r) {
long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
if (diff < 0) return -1;
if (diff > 0) return 1;
return 0;
/**
* Stores additional per-server information about the regions added/removed
* during the run of the balancing algorithm.
*
* For servers that shed regions, we need to track which regions we have already
* shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
* the server that is the next to be shed.
*/
static class BalanceInfo {
private final int nextRegionForUnload;
private int numRegionsAdded;
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
this.nextRegionForUnload = nextRegionForUnload;
this.numRegionsAdded = numRegionsAdded;
}
int getNextRegionForUnload() {
return nextRegionForUnload;
}
int getNumRegionsAdded() {
return numRegionsAdded;
}
void setNumRegionsAdded(int numAdded) {
this.numRegionsAdded = numAdded;
}
}
RegionPlanComparator rpComparator = new RegionPlanComparator();
/**
* Generate a global load balancing plan according to the specified map of
* server information to the most loaded regions of each server.
@ -219,34 +179,25 @@ public class DefaultLoadBalancer implements LoadBalancer {
* or null if cluster is already balanced
*/
public List<RegionPlan> balanceCluster(
Map<ServerName, List<HRegionInfo>> clusterState) {
Map<ServerName, List<HRegionInfo>> clusterMap) {
boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
int numServers = clusterState.size();
ClusterLoadState cs = new ClusterLoadState(clusterMap);
int numServers = cs.getNumServers();
if (numServers == 0) {
LOG.debug("numServers=0 so skipping load balancing");
return null;
}
NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad =
new TreeMap<ServerAndLoad, List<HRegionInfo>>();
int numRegions = 0;
// Iterate so we can count regions as we build the map
for (Map.Entry<ServerName, List<HRegionInfo>> server: clusterState.entrySet()) {
List<HRegionInfo> regions = server.getValue();
int sz = regions.size();
if (sz == 0) emptyRegionServerPresent = true;
numRegions += sz;
serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
}
// Check if we even need to do any load balancing
float average = (float)numRegions / numServers; // for logging
// HBASE-3681 check sloppiness first
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
if (serversByLoad.lastKey().getLoad() <= ceiling &&
serversByLoad.firstKey().getLoad() >= floor) {
NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
int numRegions = cs.getNumRegions();
if (!this.needsBalance(cs)) {
// Skipped because no server outside (min,max) range
float average = cs.getLoadAverage(); // for logging
LOG.info("Skipping load balancing because balanced cluster; " +
"servers=" + numServers + " " +
"regions=" + numRegions + " average=" + average + " " +
@ -254,6 +205,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
" leastloaded=" + serversByLoad.firstKey().getLoad());
return null;
}
int min = numRegions / numServers;
int max = numRegions % numServers == 0 ? min : min + 1;
@ -452,7 +404,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterState.entrySet()) {
for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
if (sb.length() > 0) sb.append(", ");
sb.append(e.getKey().toString());
sb.append(" ");
@ -473,7 +425,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
/**
* Add a region from the head or tail to the List of regions to return.
*/
void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
RegionPlan rp = null;
if (!fetchFromTail) rp = regionsToMove.remove();
@ -481,291 +433,4 @@ public class DefaultLoadBalancer implements LoadBalancer {
rp.setDestination(sn);
regionsToReturn.add(rp);
}
/**
* Stores additional per-server information about the regions added/removed
* during the run of the balancing algorithm.
*
* For servers that shed regions, we need to track which regions we have
* already shed. <b>nextRegionForUnload</b> contains the index in the list
* of regions on the server that is the next to be shed.
*/
private static class BalanceInfo {
private final int nextRegionForUnload;
private int numRegionsAdded;
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
this.nextRegionForUnload = nextRegionForUnload;
this.numRegionsAdded = numRegionsAdded;
}
public int getNextRegionForUnload() {
return nextRegionForUnload;
}
public int getNumRegionsAdded() {
return numRegionsAdded;
}
public void setNumRegionsAdded(int numAdded) {
this.numRegionsAdded = numAdded;
}
}
/**
* Generates a bulk assignment plan to be used on cluster startup using a
* simple round-robin assignment.
* <p>
* Takes a list of all the regions and all the servers in the cluster and
* returns a map of each server to the regions that it should be assigned.
* <p>
* Currently implemented as a round-robin assignment. Same invariant as
* load balancing, all servers holding floor(avg) or ceiling(avg).
*
* TODO: Use block locations from HDFS to place regions with their blocks
*
* @param regions all regions
* @param servers all servers
* @return map of server to the regions it should take, or null if no
* assignment is possible (ie. no regions or no servers)
*/
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
List<HRegionInfo> regions, List<ServerName> servers) {
if (regions.isEmpty() || servers.isEmpty()) {
return null;
}
Map<ServerName, List<HRegionInfo>> assignments =
new TreeMap<ServerName,List<HRegionInfo>>();
int numRegions = regions.size();
int numServers = servers.size();
int max = (int)Math.ceil((float)numRegions/numServers);
int serverIdx = 0;
if (numServers > 1) {
serverIdx = RANDOM.nextInt(numServers);
}
int regionIdx = 0;
for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
for (int i=regionIdx; i<numRegions; i += numServers) {
serverRegions.add(regions.get(i % numRegions));
}
assignments.put(server, serverRegions);
regionIdx++;
}
return assignments;
}
/**
* Generates a bulk assignment startup plan, attempting to reuse the existing
* assignment information from META, but adjusting for the specified list of
* available/online servers available for assignment.
* <p>
* Takes a map of all regions to their existing assignment from META. Also
* takes a list of online servers for regions to be assigned to. Attempts to
* retain all assignment, so in some instances initial assignment will not be
* completely balanced.
* <p>
* Any leftover regions without an existing server to be assigned to will be
* assigned randomly to available servers.
* @param regions regions and existing assignment from meta
* @param servers available servers
* @return map of servers and regions to be assigned to them
*/
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
// Group all of the old assignments by their hostname.
// We can't group directly by ServerName since the servers all have
// new start-codes.
// Group the servers by their hostname. It's possible we have multiple
// servers on the same host on different ports.
ArrayListMultimap<String, ServerName> serversByHostname =
ArrayListMultimap.create();
for (ServerName server : servers) {
serversByHostname.put(server.getHostname(), server);
}
// Now come up with new assignments
Map<ServerName, List<HRegionInfo>> assignments =
new TreeMap<ServerName, List<HRegionInfo>>();
for (ServerName server : servers) {
assignments.put(server, new ArrayList<HRegionInfo>());
}
// Collection of the hostnames that used to have regions
// assigned, but for which we no longer have any RS running
// after the cluster restart.
Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
List<ServerName> localServers = new ArrayList<ServerName>();
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostname());
}
if (localServers.isEmpty()) {
// No servers on the new cluster match up with this hostname,
// assign randomly.
ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
assignments.get(randomServer).add(region);
numRandomAssignments++;
if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
} else if (localServers.size() == 1) {
// the usual case - one new server on same host
assignments.get(localServers.get(0)).add(region);
numRetainedAssigments++;
} else {
// multiple new servers in the cluster on this same host
int size = localServers.size();
ServerName target = localServers.get(RANDOM.nextInt(size));
assignments.get(target).add(region);
numRetainedAssigments++;
}
}
String randomAssignMsg = "";
if (numRandomAssignments > 0) {
randomAssignMsg = numRandomAssignments + " regions were assigned " +
"to random hosts, since the old hosts for these regions are no " +
"longer present in the cluster. These hosts were:\n " +
Joiner.on("\n ").join(oldHostsNoLongerPresent);
}
LOG.info("Reassigned " + regions.size() + " regions. " +
numRetainedAssigments + " retained the pre-restart assignment. " +
randomAssignMsg);
return assignments;
}
/**
* Returns an ordered list of hosts that are hosting the blocks for this
* region. The weight of each host is the sum of the block lengths of all
* files on that host, so the first host in the list is the server which
* holds the most bytes of the given region's HFiles.
*
* @param fs the filesystem
* @param region region
* @return ordered list of hosts holding blocks of the specified region
*/
@SuppressWarnings("unused")
private List<ServerName> getTopBlockLocations(FileSystem fs,
HRegionInfo region) {
List<ServerName> topServerNames = null;
try {
HTableDescriptor tableDescriptor = getTableDescriptor(
region.getTableName());
if (tableDescriptor != null) {
HDFSBlocksDistribution blocksDistribution =
HRegion.computeHDFSBlocksDistribution(config, tableDescriptor,
region.getEncodedName());
List<String> topHosts = blocksDistribution.getTopHosts();
topServerNames = mapHostNameToServerName(topHosts);
}
} catch (IOException ioe) {
LOG.debug("IOException during HDFSBlocksDistribution computation. for " +
"region = " + region.getEncodedName() , ioe);
}
return topServerNames;
}
/**
* return HTableDescriptor for a given tableName
* @param tableName the table name
* @return HTableDescriptor
* @throws IOException
*/
private HTableDescriptor getTableDescriptor(byte[] tableName)
throws IOException {
HTableDescriptor tableDescriptor = null;
try {
if ( this.services != null)
{
tableDescriptor = this.services.getTableDescriptors().
get(Bytes.toString(tableName));
}
} catch (FileNotFoundException fnfe) {
LOG.debug("FileNotFoundException during getTableDescriptors."
+ " Current table name = " + Bytes.toStringBinary(tableName), fnfe);
}
return tableDescriptor;
}
/**
* Map hostname to ServerName, The output ServerName list will have the same
* order as input hosts.
* @param hosts the list of hosts
* @return ServerName list
*/
private List<ServerName> mapHostNameToServerName(List<String> hosts) {
if ( hosts == null || status == null) {
return null;
}
List<ServerName> topServerNames = new ArrayList<ServerName>();
Collection<ServerName> regionServers = status.getServers();
// create a mapping from hostname to ServerName for fast lookup
HashMap<String, ServerName> hostToServerName =
new HashMap<String, ServerName>();
for (ServerName sn : regionServers) {
hostToServerName.put(sn.getHostname(), sn);
}
for (String host : hosts ) {
ServerName sn = hostToServerName.get(host);
// it is possible that HDFS is up ( thus host is valid ),
// but RS is down ( thus sn is null )
if (sn != null) {
topServerNames.add(sn);
}
}
return topServerNames;
}
/**
* Generates an immediate assignment plan to be used by a new master for
* regions in transition that do not have an already known destination.
*
* Takes a list of regions that need immediate assignment and a list of
* all available servers. Returns a map of regions to the server they
* should be assigned to.
*
* This method will return quickly and does not do any intelligent
* balancing. The goal is to make a fast decision not the best decision
* possible.
*
* Currently this is random.
*
* @param regions
* @param servers
* @return map of regions to the server it should be assigned to
*/
public Map<HRegionInfo, ServerName> immediateAssignment(
List<HRegionInfo> regions, List<ServerName> servers) {
Map<HRegionInfo,ServerName> assignments =
new TreeMap<HRegionInfo,ServerName>();
for(HRegionInfo region : regions) {
assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
}
return assignments;
}
public ServerName randomAssignment(HRegionInfo regionInfo,
List<ServerName> servers) {
if (servers == null || servers.isEmpty()) {
LOG.warn("Wanted to do random assignment but no servers to assign to");
return null;
}
return servers.get(RANDOM.nextInt(servers.size()));
}
}

View File

@ -1,6 +1,4 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -39,9 +37,9 @@ public class LoadBalancerFactory {
public static LoadBalancer getLoadBalancer(Configuration conf) {
// Create the balancer
Class<? extends LoadBalancer> balancerKlass = conf.getClass(
HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
DefaultLoadBalancer.class, LoadBalancer.class);
Class<? extends LoadBalancer> balancerKlass =
conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class,
LoadBalancer.class);
return ReflectionUtils.newInstance(balancerKlass, conf);
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.Comparator;
import org.apache.hadoop.hbase.HRegionInfo;
/**
* The following comparator assumes that RegionId from HRegionInfo can represent
* the age of the region - larger RegionId means the region is younger. This
* comparator is used in balanceCluster() to account for the out-of-band regions
* which were assigned to the server after some other region server crashed.
*/
class RegionInfoComparator implements Comparator<HRegionInfo> {
@Override
public int compare(HRegionInfo l, HRegionInfo r) {
long diff = r.getRegionId() - l.getRegionId();
if (diff < 0) return -1;
if (diff > 0) return 1;
return 0;
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* This will find where data for a region is located in HDFS. It ranks
* {@link ServerName}'s by the size of the store files they are holding for a
* given region.
*
*/
class RegionLocationFinder {
private static Log LOG = LogFactory.getLog(RegionLocationFinder.class);
private Configuration conf;
private ClusterStatus status;
private MasterServices services;
private CacheLoader<HRegionInfo, List<ServerName>> loader =
new CacheLoader<HRegionInfo, List<ServerName>>() {
@Override
public List<ServerName> load(HRegionInfo key) throws Exception {
List<ServerName> servers = internalGetTopBlockLocation(key);
if (servers == null) {
return new LinkedList<ServerName>();
}
return servers;
}
};
// The cache for where regions are located.
private LoadingCache<HRegionInfo, List<ServerName>> cache = null;
/**
* Create a cache for region to list of servers
* @param mins Number of mins to cache
* @return A new Cache.
*/
private LoadingCache<HRegionInfo, List<ServerName>> createCache(int mins) {
return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader);
}
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
cache = createCache(conf.getInt("hbase.master.balancer.regionLocationCacheTime", 30));
}
public void setServices(MasterServices services) {
this.services = services;
}
public void setClusterStatus(ClusterStatus status) {
this.status = status;
}
protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
List<ServerName> servers = null;
try {
servers = cache.get(region);
} catch (ExecutionException ex) {
servers = new LinkedList<ServerName>();
}
return servers;
}
/**
* Returns an ordered list of hosts that are hosting the blocks for this
* region. The weight of each host is the sum of the block lengths of all
* files on that host, so the first host in the list is the server which holds
* the most bytes of the given region's HFiles.
*
* @param fs the filesystem
* @param region region
* @return ordered list of hosts holding blocks of the specified region
*/
protected List<ServerName> internalGetTopBlockLocation(HRegionInfo region) {
List<ServerName> topServerNames = null;
try {
HTableDescriptor tableDescriptor = getTableDescriptor(region.getTableName());
if (tableDescriptor != null) {
HDFSBlocksDistribution blocksDistribution =
HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor,
region.getEncodedName());
List<String> topHosts = blocksDistribution.getTopHosts();
topServerNames = mapHostNameToServerName(topHosts);
}
} catch (IOException ioe) {
LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = "
+ region.getEncodedName(), ioe);
}
return topServerNames;
}
/**
* return HTableDescriptor for a given tableName
*
* @param tableName the table name
* @return HTableDescriptor
* @throws IOException
*/
protected HTableDescriptor getTableDescriptor(byte[] tableName) throws IOException {
HTableDescriptor tableDescriptor = null;
try {
if (this.services != null) {
tableDescriptor = this.services.getTableDescriptors().get(Bytes.toString(tableName));
}
} catch (FileNotFoundException fnfe) {
LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
+ Bytes.toStringBinary(tableName), fnfe);
}
return tableDescriptor;
}
/**
* Map hostname to ServerName, The output ServerName list will have the same
* order as input hosts.
*
* @param hosts the list of hosts
* @return ServerName list
*/
protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
if (hosts == null || status == null) {
return null;
}
List<ServerName> topServerNames = new ArrayList<ServerName>();
Collection<ServerName> regionServers = status.getServers();
// create a mapping from hostname to ServerName for fast lookup
HashMap<String, ServerName> hostToServerName = new HashMap<String, ServerName>();
for (ServerName sn : regionServers) {
hostToServerName.put(sn.getHostname(), sn);
}
for (String host : hosts) {
ServerName sn = hostToServerName.get(host);
// it is possible that HDFS is up ( thus host is valid ),
// but RS is down ( thus sn is null )
if (sn != null) {
topServerNames.add(sn);
}
}
return topServerNames;
}
}

View File

@ -15,8 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.balancer;
import java.io.Serializable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
@ -25,7 +26,8 @@ import org.apache.hadoop.hbase.ServerName;
* Data structure that holds servername and 'load'.
*/
@InterfaceAudience.Private
class ServerAndLoad implements Comparable<ServerAndLoad> {
class ServerAndLoad implements Comparable<ServerAndLoad>, Serializable {
private static final long serialVersionUID = 2735470854607296965L;
private final ServerName sn;
private final int load;
@ -47,4 +49,13 @@ class ServerAndLoad implements Comparable<ServerAndLoad> {
int diff = this.load - other.load;
return diff != 0 ? diff : this.sn.compareTo(other.getServerName());
}
@Override
public boolean equals(Object o) {
if (o instanceof ServerAndLoad) {
ServerAndLoad sl = (ServerAndLoad) o;
return this.compareTo(sl) == 0;
}
return false;
}
}

View File

@ -0,0 +1,696 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
* randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
* new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
* <ul>
* <li>Region Load</li>
* <li>Table Load</li>
* <li>Data Locality</li>
* <li>Memstore Sizes</li>
* <li>Storefile Sizes</li>
* </ul>
*
*
* <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
* best solution, and 1 is the highest possible cost and the worst solution. The computed costs are
* scaled by their respective multipliers:</p>
*
* <ul>
* <li>hbase.master.balancer.stochastic.regionLoadCost</li>
* <li>hbase.master.balancer.stochastic.moveCost</li>
* <li>hbase.master.balancer.stochastic.tableLoadCost</li>
* <li>hbase.master.balancer.stochastic.localityCost</li>
* <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
* <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
* </ul>
*
* <p>In addition to the above configurations, the balancer can be tuned by the following
* configuration values:</p>
* <ul>
* <li>hbase.master.balancer.stochastic.maxMoveRegions which
* controls what the max number of regions that can be moved in a single invocation of this
* balancer.</li>
* <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
* regions is multiplied to try and get the number of times the balancer will
* mutate all servers.</li>
* <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
* the balancer will try and mutate all the servers. The balancer will use the minimum of this
* value and the above computation.</li>
* </ul>
*
* <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
* so that the balancer gets the full picture of all loads on the cluster.</p>
*/
@InterfaceAudience.Private
public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final String STOREFILE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.storefileSizeCost";
private static final String MEMSTORE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.memstoreSizeCost";
private static final String WRITE_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.writeRequestCost";
private static final String READ_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.readRequestCost";
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
private static final String TABLE_LOAD_COST_KEY =
"hbase.master.balancer.stochastic.tableLoadCost";
private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
private static final String REGION_LOAD_COST_KEY =
"hbase.master.balancer.stochastic.regionLoadCost";
private static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion";
private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
private final RegionLocationFinder regionFinder = new RegionLocationFinder();
private ClusterStatus clusterStatus = null;
private Map<String, RegionLoad> loads = new HashMap<String, RegionLoad>();
// values are defaults
private int maxSteps = 15000;
private int stepsPerRegion = 110;
private int maxMoves = 600;
private float loadMultiplier = 55;
private float moveCostMultiplier = 5;
private float tableMultiplier = 5;
private float localityMultiplier = 5;
private float readRequestMultiplier = 0;
private float writeRequestMultiplier = 0;
private float memStoreSizeMultiplier = 5;
private float storeFileSizeMultiplier = 5;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
regionFinder.setConf(conf);
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
// Load multiplier should be the greatest as it is the most general way to balance data.
loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
// Move cost multiplier should be the same cost or higer than the rest of the costs to ensure
// that two costs must get better to justify a move cost.
moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier);
// These are the added costs so that the stochastic load balancer can get a little bit smarter
// about where to move regions.
tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier);
localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
// These are not used currently.
// TODO: Start using these once rolling averages are implemented for read/write load.
readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
}
@Override
public void setClusterStatus(ClusterStatus st) {
super.setClusterStatus(st);
regionFinder.setClusterStatus(st);
this.clusterStatus = st;
updateRegionLoad();
}
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
this.services = masterServices;
this.regionFinder.setServices(masterServices);
}
/**
* Given the cluster state this will try and approach an optimal balance. This
* should always approach the optimal state given enough steps.
*/
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
// No need to balance a one node cluster.
if (clusterState.size() <= 1) {
LOG.debug("Skipping load balance as cluster has only one node.");
return null;
}
long startTime = System.currentTimeMillis();
// Keep track of servers to iterate through them.
List<ServerName> servers = new ArrayList<ServerName>(clusterState.keySet());
Map<HRegionInfo, ServerName> initialRegionMapping = createRegionMapping(clusterState);
double currentCost, newCost, initCost;
currentCost = newCost = initCost = computeCost(initialRegionMapping, clusterState);
int computedMaxSteps =
Math.min(this.maxSteps, (initialRegionMapping.size() * this.stepsPerRegion));
// Perform a stochastic walk to see if we can get a good fit.
for (int step = 0; step < computedMaxSteps; step++) {
// try and perform a mutation
for (ServerName leftServer : servers) {
// What server are we going to be swapping regions with ?
ServerName rightServer = pickOtherServer(leftServer, servers);
if (rightServer == null) {
continue;
}
// Get the regions.
List<HRegionInfo> leftRegionList = clusterState.get(leftServer);
List<HRegionInfo> rightRegionList = clusterState.get(rightServer);
// Pick what regions to swap around.
// If we get a null for one then this isn't a swap just a move
HRegionInfo lRegion = pickRandomRegion(leftRegionList, 0);
HRegionInfo rRegion = pickRandomRegion(rightRegionList, 0.5);
// We randomly picked to do nothing.
if (lRegion == null && rRegion == null) {
continue;
}
if (rRegion != null) {
leftRegionList.add(rRegion);
}
if (lRegion != null) {
rightRegionList.add(lRegion);
}
newCost = computeCost(initialRegionMapping, clusterState);
// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
} else {
// Put things back the way they were before.
if (rRegion != null) {
leftRegionList.remove(rRegion);
rightRegionList.add(rRegion);
}
if (lRegion != null) {
rightRegionList.remove(lRegion);
leftRegionList.add(lRegion);
}
}
}
}
long endTime = System.currentTimeMillis();
if (initCost > currentCost) {
List<RegionPlan> plans = createRegionPlans(initialRegionMapping, clusterState);
LOG.debug("Finished computing new laod balance plan. Computation took "
+ (endTime - startTime) + "ms to try " + computedMaxSteps
+ " different iterations. Found a solution that moves " + plans.size()
+ " regions; Going from a computed cost of " + initCost + " to a new cost of "
+ currentCost);
return plans;
}
LOG.debug("Could not find a better load balance plan. Tried " + computedMaxSteps
+ " different configurations in " + (endTime - startTime)
+ "ms, and did not find anything with a computed cost less than " + initCost);
return null;
}
/**
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
* state.
*
* @param initialRegionMapping Initial mapping of Region to Server
* @param clusterState The desired mapping of ServerName to Regions
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
*/
private List<RegionPlan> createRegionPlans(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
List<RegionPlan> plans = new LinkedList<RegionPlan>();
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
ServerName newServer = entry.getKey();
for (HRegionInfo region : entry.getValue()) {
ServerName initialServer = initialRegionMapping.get(region);
if (!newServer.equals(initialServer)) {
LOG.trace("Moving Region " + region.getEncodedName() + " from server "
+ initialServer.getHostname() + " to " + newServer.getHostname());
RegionPlan rp = new RegionPlan(region, initialServer, newServer);
plans.add(rp);
}
}
}
return plans;
}
/**
* Create a map that will represent the initial location of regions on a
* {@link ServerName}
*
* @param clusterState starting state of the cluster and regions.
* @return A map of {@link HRegionInfo} to the {@link ServerName} that is
* currently hosting that region
*/
private Map<HRegionInfo, ServerName> createRegionMapping(
Map<ServerName, List<HRegionInfo>> clusterState) {
Map<HRegionInfo, ServerName> mapping = new HashMap<HRegionInfo, ServerName>();
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
for (HRegionInfo region : entry.getValue()) {
mapping.put(region, entry.getKey());
}
}
return mapping;
}
/** Store the current region loads. */
private void updateRegionLoad() {
loads.clear();
for (ServerName sn : clusterStatus.getServers()) {
HServerLoad hsl = clusterStatus.getLoad(sn);
if (hsl == null) continue;
for (Entry<byte[], RegionLoad> entry : hsl.getRegionsLoad().entrySet()) {
loads.put(Bytes.toString(entry.getKey()), entry.getValue());
}
}
}
/**
* From a list of regions pick a random one. Null can be returned which
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
* rather than swap.
*
* @param regions list of regions.
* @param chanceOfNoSwap Chance that this will decide to try a move rather
* than a swap.
* @return a random {@link HRegionInfo} or null if an asymmetrical move is
* suggested.
*/
private HRegionInfo pickRandomRegion(List<HRegionInfo> regions, double chanceOfNoSwap) {
//Check to see if this is just a move.
if (regions.isEmpty() || RANDOM.nextFloat() < chanceOfNoSwap) {
//signal a move only.
return null;
}
int count = 0;
HRegionInfo r = null;
//We will try and find a region up to 10 times. If we always
while (count < 10 && r == null ) {
count++;
r = regions.get(RANDOM.nextInt(regions.size()));
// If this is a special region we always try not to move it.
// so clear out r. try again
if (r.isMetaRegion() || r.isRootRegion() ) {
r = null;
}
}
if (r != null) {
regions.remove(r);
}
return r;
}
/**
* Given a server we will want to switch regions with another server. This
* function picks a random server from the list.
*
* @param server Current Server. This server will never be the return value.
* @param allServers list of all server from which to pick
* @return random server. Null if no other servers were found.
*/
private ServerName pickOtherServer(ServerName server, List<ServerName> allServers) {
ServerName s = null;
int count = 0;
while (count < 100 && (s == null || s.equals(server))) {
count++;
s = allServers.get(RANDOM.nextInt(allServers.size()));
}
// If nothing but the current server was found return null.
return (s == null || s.equals(server)) ? null : s;
}
/**
* This is the main cost function. It will compute a cost associated with a proposed cluster
* state. All different costs will be combined with their multipliers to produce a double cost.
*
* @param initialRegionMapping Map of where the regions started.
* @param clusterState Map of ServerName to list of regions.
* @return a double of a cost associated with the proposed
*/
protected double computeCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
double moveCost = moveCostMultiplier * computeMoveCost(initialRegionMapping, clusterState);
double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(clusterState);
double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(clusterState);
double localityCost =
localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState);
// TODO: Add Read and Write requests back in here after keeping a running average on per
// region load metrics.
double memstoreSizeCost =
memStoreSizeMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE);
double storefileSizeCost =
storeFileSizeMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE);
double total =
moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
+ storefileSizeCost;
LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
+ moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
+ tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
+ memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
return total;
}
/**
* Given the starting state of the regions and a potential ending state
* compute cost based upon the number of regions that have moved.
*
* @param initialRegionMapping The starting location of regions.
* @param clusterState The potential new cluster state.
* @return The cost. Between 0 and 1.
*/
double computeMoveCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
float moveCost = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
for (HRegionInfo region : entry.getValue()) {
if (initialRegionMapping.get(region) != entry.getKey()) {
moveCost += 1;
}
}
}
//Don't let this single balance move more than the max moves.
//This allows better scaling to accurately represent the actual cost of a move.
if (moveCost > maxMoves) {
return 10000; //return a number much greater than any of the other cost functions
}
return scale(0, Math.min(maxMoves, initialRegionMapping.size()), moveCost);
}
/**
* Compute the cost of a potential cluster state from skew in number of
* regions on a cluster
*
* @param clusterState The proposed cluster state
* @return The cost of region load imbalance.
*/
double computeSkewLoadCost(Map<ServerName, List<HRegionInfo>> clusterState) {
DescriptiveStatistics stats = new DescriptiveStatistics();
for (List<HRegionInfo> regions : clusterState.values()) {
int size = regions.size();
stats.addValue(size);
}
return costFromStats(stats);
}
/**
* Compute the cost of a potential cluster configuration based upon how evenly
* distributed tables are.
*
* @param clusterState Proposed cluster state.
* @return Cost of imbalance in table.
*/
double computeTableSkewLoadCost(Map<ServerName, List<HRegionInfo>> clusterState) {
Map<String, MutableInt> tableRegionsTotal = new HashMap<String, MutableInt>();
Map<String, MutableInt> tableRegionsOnCurrentServer = new HashMap<String, MutableInt>();
Map<String, Integer> tableCostSeenSoFar = new HashMap<String, Integer>();
// Go through everything per server
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
tableRegionsOnCurrentServer.clear();
// For all of the regions count how many are from each table
for (HRegionInfo region : entry.getValue()) {
String tableName = region.getTableNameAsString();
// See if this table already has a count on this server
MutableInt regionsOnServerCount = tableRegionsOnCurrentServer.get(tableName);
// If this is the first time we've seen this table on this server
// create a new mutable int.
if (regionsOnServerCount == null) {
regionsOnServerCount = new MutableInt(0);
tableRegionsOnCurrentServer.put(tableName, regionsOnServerCount);
}
// Increment the count of how many regions from this table are host on
// this server
regionsOnServerCount.increment();
// Now count the number of regions in this table.
MutableInt totalCount = tableRegionsTotal.get(tableName);
// If this is the first region from this table create a new counter for
// this table.
if (totalCount == null) {
totalCount = new MutableInt(0);
tableRegionsTotal.put(tableName, totalCount);
}
totalCount.increment();
}
// Now go through all of the tables we have seen and keep the max number
// of regions of this table a single region server is hosting.
for (String tableName : tableRegionsOnCurrentServer.keySet()) {
Integer thisCount = tableRegionsOnCurrentServer.get(tableName).toInteger();
Integer maxCountSoFar = tableCostSeenSoFar.get(tableName);
if (maxCountSoFar == null || thisCount.compareTo(maxCountSoFar) > 0) {
tableCostSeenSoFar.put(tableName, thisCount);
}
}
}
double max = 0;
double min = 0;
double value = 0;
// Compute the min, value, and max.
for (String tableName : tableRegionsTotal.keySet()) {
max += tableRegionsTotal.get(tableName).doubleValue();
min += tableRegionsTotal.get(tableName).doubleValue() / (double) clusterState.size();
value += tableCostSeenSoFar.get(tableName).doubleValue();
}
return scale(min, max, value);
}
/**
* Compute a cost of a potential cluster configuration based upon where
* {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
*
* @param clusterState The state of the cluster
* @return A cost between 0 and 1. 0 Means all regions are on the sever with
* the most local store files.
*/
double computeDataLocalityCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
double max = 0;
double cost = 0;
// If there's no master so there's no way anything else works.
if (this.services == null) return cost;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
ServerName sn = entry.getKey();
for (HRegionInfo region : entry.getValue()) {
max += 1;
// Only compute the data locality for moved regions.
if (initialRegionMapping.equals(sn)) {
continue;
}
List<ServerName> dataOnServers = regionFinder.getTopBlockLocations(region);
// If we can't find where the data is getTopBlock returns null.
// so count that as being the best possible.
if (dataOnServers == null) {
continue;
}
int index = dataOnServers.indexOf(sn);
if (index < 0) {
cost += 1;
} else {
cost += (double) index / (double) dataOnServers.size();
}
}
}
return scale(0, max, cost);
}
/** The cost's that can be derived from RegionLoad */
private enum RegionLoadCostType {
READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE
}
/**
* Compute the cost of the current cluster state due to some RegionLoadCost type
*
* @param clusterState the cluster
* @param costType what type of cost to consider
* @return the scaled cost.
*/
private double computeRegionLoadCost(Map<ServerName, List<HRegionInfo>> clusterState,
RegionLoadCostType costType) {
if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
DescriptiveStatistics stats = new DescriptiveStatistics();
// For every server look at the cost of each region
for (List<HRegionInfo> regions : clusterState.values()) {
long cost = 0; //Cost this server has from RegionLoad
// For each region
for (HRegionInfo region : regions) {
// Try and get the region using the regionNameAsString
RegionLoad rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
if (rl == null) {
// Try getting the region load using encoded name.
rl = loads.get(region.getEncodedName());
}
// Now if we found a region load get the type of cost that was requested.
if (rl != null) {
cost += getRegionLoadCost(rl, costType);
}
}
// Add the total cost to the stats.
stats.addValue(cost);
}
// No return the scaled cost from data held in the stats object.
return costFromStats(stats);
}
/**
* Get the un-scaled cost from a RegionLoad
*
* @param rl the Region load
* @param type The type of cost to extract
* @return the double representing the cost
*/
private double getRegionLoadCost(RegionLoad rl, RegionLoadCostType type) {
switch (type) {
case READ_REQUEST:
return rl.getReadRequestsCount();
case WRITE_REQUEST:
return rl.getWriteRequestsCount();
case MEMSTORE_SIZE:
return rl.getMemStoreSizeMB();
case STOREFILE_SIZE:
return rl.getStorefileSizeMB();
default:
assert false : "RegionLoad cost type not supported.";
return 0;
}
}
/**
* Function to compute a scaled cost using {@link DescriptiveStatistics}. It
* assumes that this is a zero sum set of costs. It assumes that the worst case
* possible is all of the elements in one region server and the rest having 0.
*
* @param stats the costs
* @return a scaled set of costs.
*/
double costFromStats(DescriptiveStatistics stats) {
double totalCost = 0;
double mean = stats.getMean();
//Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((stats.getN() - 1) * stats.getMean()) + (stats.getSum() - stats.getMean());
for (double n : stats.getValues()) {
totalCost += Math.abs(mean - n);
}
return scale(0, max, totalCost);
}
/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
private double scale(double min, double max, double value) {
if (max == 0 || value == 0) {
return 0;
}
return Math.max(0d, Math.min(1d, (value - min) / max));
}
}

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
@ -37,35 +38,52 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Test whether region rebalancing works. (HBASE-71)
* Test whether region re-balancing works. (HBASE-71)
*/
@Category(LargeTests.class)
@RunWith(value = Parameterized.class)
public class TestRegionRebalancing {
final Log LOG = LogFactory.getLog(this.getClass().getName());
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
HTable table;
HTableDescriptor desc;
private static final byte [] FAMILY_NAME = Bytes.toBytes("col");
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.startMiniCluster(1);
@Parameters
public static Collection<Object[]> data() {
Object[][] balancers =
new String[][] { { "org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer" },
{ "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer" } };
return Arrays.asList(balancers);
}
@AfterClass
public static void afterClass() throws Exception {
private static final byte[] FAMILY_NAME = Bytes.toBytes("col");
public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class);
private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private HTable table;
private HTableDescriptor desc;
private String balancerName;
public TestRegionRebalancing(String balancerName) {
this.balancerName = balancerName;
}
@After
public void after() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void before() {
public void before() throws Exception {
UTIL.getConfiguration().set("hbase.master.loadbalancer.class", this.balancerName);
UTIL.startMiniCluster(1);
this.desc = new HTableDescriptor("test");
this.desc.addFamily(new HColumnDescriptor(FAMILY_NAME));
}

View File

@ -34,12 +34,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@ -52,13 +50,15 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

View File

@ -1,513 +0,0 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the load balancer that is created by default.
*/
@Category(SmallTests.class)
public class TestDefaultLoadBalancer {
private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class);
private static LoadBalancer loadBalancer;
private static Random rand;
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.regions.slop", "0");
loadBalancer = new DefaultLoadBalancer();
loadBalancer.setConf(conf);
rand = new Random();
}
// int[testnum][servernumber] -> numregions
int [][] clusterStateMocks = new int [][] {
// 1 node
new int [] { 0 },
new int [] { 1 },
new int [] { 10 },
// 2 node
new int [] { 0, 0 },
new int [] { 2, 0 },
new int [] { 2, 1 },
new int [] { 2, 2 },
new int [] { 2, 3 },
new int [] { 2, 4 },
new int [] { 1, 1 },
new int [] { 0, 1 },
new int [] { 10, 1 },
new int [] { 14, 1432 },
new int [] { 47, 53 },
// 3 node
new int [] { 0, 1, 2 },
new int [] { 1, 2, 3 },
new int [] { 0, 2, 2 },
new int [] { 0, 3, 0 },
new int [] { 0, 4, 0 },
new int [] { 20, 20, 0 },
// 4 node
new int [] { 0, 1, 2, 3 },
new int [] { 4, 0, 0, 0 },
new int [] { 5, 0, 0, 0 },
new int [] { 6, 6, 0, 0 },
new int [] { 6, 2, 0, 0 },
new int [] { 6, 1, 0, 0 },
new int [] { 6, 0, 0, 0 },
new int [] { 4, 4, 4, 7 },
new int [] { 4, 4, 4, 8 },
new int [] { 0, 0, 0, 7 },
// 5 node
new int [] { 1, 1, 1, 1, 4 },
// more nodes
new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 },
new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 },
new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 },
new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 },
new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 },
new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 },
new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 },
new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 },
new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 },
new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 },
new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 },
new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 },
new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 },
new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
new int [] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 }
};
int [][] regionsAndServersMocks = new int [][] {
// { num regions, num servers }
new int [] { 0, 0 },
new int [] { 0, 1 },
new int [] { 1, 1 },
new int [] { 2, 1 },
new int [] { 10, 1 },
new int [] { 1, 2 },
new int [] { 2, 2 },
new int [] { 3, 2 },
new int [] { 1, 3 },
new int [] { 2, 3 },
new int [] { 3, 3 },
new int [] { 25, 3 },
new int [] { 2, 10 },
new int [] { 2, 100 },
new int [] { 12, 10 },
new int [] { 12, 100 },
};
/**
* Test the load balancing algorithm.
*
* Invariant is that all servers should be hosting either
* floor(average) or ceiling(average)
*
* @throws Exception
*/
@Test
public void testBalanceCluster() throws Exception {
for(int [] mockCluster : clusterStateMocks) {
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
List <ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
List<ServerAndLoad> balancedCluster = reconcile(list, plans);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
for(Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
}
}
}
/**
* Invariant is that all servers have between floor(avg) and ceiling(avg)
* number of regions.
*/
public void assertClusterAsBalanced(List<ServerAndLoad> servers) {
int numServers = servers.size();
int numRegions = 0;
int maxRegions = 0;
int minRegions = Integer.MAX_VALUE;
for(ServerAndLoad server : servers) {
int nr = server.getLoad();
if(nr > maxRegions) {
maxRegions = nr;
}
if(nr < minRegions) {
minRegions = nr;
}
numRegions += nr;
}
if(maxRegions - minRegions < 2) {
// less than 2 between max and min, can't balance
return;
}
int min = numRegions / numServers;
int max = numRegions % numServers == 0 ? min : min + 1;
for(ServerAndLoad server : servers) {
assertTrue(server.getLoad() <= max);
assertTrue(server.getLoad() >= min);
}
}
/**
* Tests immediate assignment.
*
* Invariant is that all regions have an assignment.
*
* @throws Exception
*/
@Test
public void testImmediateAssignment() throws Exception {
for(int [] mock : regionsAndServersMocks) {
LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
List<HRegionInfo> regions = randomRegions(mock[0]);
List<ServerAndLoad> servers = randomServers(mock[1], 0);
List<ServerName> list = getListOfServerNames(servers);
Map<HRegionInfo,ServerName> assignments =
loadBalancer.immediateAssignment(regions, list);
assertImmediateAssignment(regions, list, assignments);
returnRegions(regions);
returnServers(list);
}
}
/**
* All regions have an assignment.
* @param regions
* @param servers
* @param assignments
*/
private void assertImmediateAssignment(List<HRegionInfo> regions,
List<ServerName> servers, Map<HRegionInfo, ServerName> assignments) {
for(HRegionInfo region : regions) {
assertTrue(assignments.containsKey(region));
}
}
/**
* Tests the bulk assignment used during cluster startup.
*
* Round-robin. Should yield a balanced cluster so same invariant as the load
* balancer holds, all servers holding either floor(avg) or ceiling(avg).
*
* @throws Exception
*/
@Test
public void testBulkAssignment() throws Exception {
for(int [] mock : regionsAndServersMocks) {
LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
List<HRegionInfo> regions = randomRegions(mock[0]);
List<ServerAndLoad> servers = randomServers(mock[1], 0);
List<ServerName> list = getListOfServerNames(servers);
Map<ServerName, List<HRegionInfo>> assignments =
loadBalancer.roundRobinAssignment(regions, list);
float average = (float)regions.size()/servers.size();
int min = (int)Math.floor(average);
int max = (int)Math.ceil(average);
if(assignments != null && !assignments.isEmpty()) {
for(List<HRegionInfo> regionList : assignments.values()) {
assertTrue(regionList.size() == min || regionList.size() == max);
}
}
returnRegions(regions);
returnServers(list);
}
}
/**
* Test the cluster startup bulk assignment which attempts to retain
* assignment info.
* @throws Exception
*/
@Test
public void testRetainAssignment() throws Exception {
// Test simple case where all same servers are there
List<ServerAndLoad> servers = randomServers(10, 10);
List<HRegionInfo> regions = randomRegions(100);
Map<HRegionInfo, ServerName> existing =
new TreeMap<HRegionInfo, ServerName>();
for (int i = 0; i < regions.size(); i++) {
ServerName sn = servers.get(i % servers.size()).getServerName();
// The old server would have had same host and port, but different
// start code!
ServerName snWithOldStartCode =
new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() - 10);
existing.put(regions.get(i), snWithOldStartCode);
}
List<ServerName> listOfServerNames = getListOfServerNames(servers);
Map<ServerName, List<HRegionInfo>> assignment =
loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
// Include two new servers that were not there before
List<ServerAndLoad> servers2 =
new ArrayList<ServerAndLoad>(servers);
servers2.add(randomServer(10));
servers2.add(randomServer(10));
listOfServerNames = getListOfServerNames(servers2);
assignment = loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
// Remove two of the servers that were previously there
List<ServerAndLoad> servers3 =
new ArrayList<ServerAndLoad>(servers);
servers3.remove(0);
servers3.remove(0);
listOfServerNames = getListOfServerNames(servers3);
assignment = loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
}
private List<ServerName> getListOfServerNames(final List<ServerAndLoad> sals) {
List<ServerName> list = new ArrayList<ServerName>();
for (ServerAndLoad e: sals) {
list.add(e.getServerName());
}
return list;
}
/**
* Asserts a valid retained assignment plan.
* <p>
* Must meet the following conditions:
* <ul>
* <li>Every input region has an assignment, and to an online server
* <li>If a region had an existing assignment to a server with the same
* address a a currently online server, it will be assigned to it
* </ul>
* @param existing
* @param servers
* @param assignment
*/
private void assertRetainedAssignment(
Map<HRegionInfo, ServerName> existing, List<ServerName> servers,
Map<ServerName, List<HRegionInfo>> assignment) {
// Verify condition 1, every region assigned, and to online server
Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers);
Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
assertTrue("Region assigned to server that was not listed as online",
onlineServerSet.contains(a.getKey()));
for (HRegionInfo r : a.getValue()) assignedRegions.add(r);
}
assertEquals(existing.size(), assignedRegions.size());
// Verify condition 2, if server had existing assignment, must have same
Set<String> onlineHostNames = new TreeSet<String>();
for (ServerName s : servers) {
onlineHostNames.add(s.getHostname());
}
for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
ServerName assignedTo = a.getKey();
for (HRegionInfo r : a.getValue()) {
ServerName address = existing.get(r);
if (address != null && onlineHostNames.contains(address.getHostname())) {
// this region was prevously assigned somewhere, and that
// host is still around, then it should be re-assigned on the
// same host
assertEquals(address.getHostname(), assignedTo.getHostname());
}
}
}
}
private String printStats(List<ServerAndLoad> servers) {
int numServers = servers.size();
int totalRegions = 0;
for(ServerAndLoad server : servers) {
totalRegions += server.getLoad();
}
float average = (float)totalRegions / numServers;
int max = (int)Math.ceil(average);
int min = (int)Math.floor(average);
return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]";
}
private List<ServerAndLoad> convertToList(final Map<ServerName, List<HRegionInfo>> servers) {
List<ServerAndLoad> list =
new ArrayList<ServerAndLoad>(servers.size());
for (Map.Entry<ServerName, List<HRegionInfo>> e: servers.entrySet()) {
list.add(new ServerAndLoad(e.getKey(), e.getValue().size()));
}
return list;
}
private String printMock(List<ServerAndLoad> balancedCluster) {
SortedSet<ServerAndLoad> sorted =
new TreeSet<ServerAndLoad>(balancedCluster);
ServerAndLoad [] arr =
sorted.toArray(new ServerAndLoad[sorted.size()]);
StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
sb.append("{ ");
for(int i = 0; i < arr.length; i++) {
if (i != 0) {
sb.append(" , ");
}
sb.append(arr[i].getLoad());
}
sb.append(" }");
return sb.toString();
}
/**
* This assumes the RegionPlan HSI instances are the same ones in the map, so
* actually no need to even pass in the map, but I think it's clearer.
* @param list
* @param plans
* @return
*/
private List<ServerAndLoad> reconcile(List<ServerAndLoad> list,
List<RegionPlan> plans) {
List<ServerAndLoad> result =
new ArrayList<ServerAndLoad>(list.size());
if (plans == null) return result;
Map<ServerName, ServerAndLoad> map =
new HashMap<ServerName, ServerAndLoad>(list.size());
for (RegionPlan plan : plans) {
ServerName source = plan.getSource();
updateLoad(map, source, -1);
ServerName destination = plan.getDestination();
updateLoad(map, destination, +1);
}
result.clear();
result.addAll(map.values());
return result;
}
private void updateLoad(Map<ServerName, ServerAndLoad> map,
final ServerName sn, final int diff) {
ServerAndLoad sal = map.get(sn);
if (sal == null) return;
sal = new ServerAndLoad(sn, sal.getLoad() + diff);
map.put(sn, sal);
}
private Map<ServerName, List<HRegionInfo>> mockClusterServers(
int [] mockCluster) {
int numServers = mockCluster.length;
Map<ServerName, List<HRegionInfo>> servers =
new TreeMap<ServerName, List<HRegionInfo>>();
for(int i = 0; i < numServers; i++) {
int numRegions = mockCluster[i];
ServerAndLoad sal = randomServer(0);
List<HRegionInfo> regions = randomRegions(numRegions);
servers.put(sal.getServerName(), regions);
}
return servers;
}
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
static int regionId = 0;
private List<HRegionInfo> randomRegions(int numRegions) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
byte [] start = new byte[16];
byte [] end = new byte[16];
rand.nextBytes(start);
rand.nextBytes(end);
for(int i=0;i<numRegions;i++) {
if(!regionQueue.isEmpty()) {
regions.add(regionQueue.poll());
continue;
}
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
HRegionInfo hri = new HRegionInfo(
Bytes.toBytes("table" + i), start, end,
false, regionId++);
regions.add(hri);
}
return regions;
}
private void returnRegions(List<HRegionInfo> regions) {
regionQueue.addAll(regions);
}
private Queue<ServerName> serverQueue = new LinkedList<ServerName>();
private ServerAndLoad randomServer(final int numRegionsPerServer) {
if (!this.serverQueue.isEmpty()) {
ServerName sn = this.serverQueue.poll();
return new ServerAndLoad(sn, numRegionsPerServer);
}
String host = "server" + rand.nextInt(100000);
int port = rand.nextInt(60000);
long startCode = rand.nextLong();
ServerName sn = new ServerName(host, port, startCode);
return new ServerAndLoad(sn, numRegionsPerServer);
}
private List<ServerAndLoad> randomServers(int numServers, int numRegionsPerServer) {
List<ServerAndLoad> servers =
new ArrayList<ServerAndLoad>(numServers);
for (int i = 0; i < numServers; i++) {
servers.add(randomServer(numRegionsPerServer));
}
return servers;
}
private void returnServer(ServerName server) {
serverQueue.add(server);
}
private void returnServers(List<ServerName> servers) {
this.serverQueue.addAll(servers);
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Class used to be the base of unit tests on load balancers. It gives helper
* methods to create maps of {@link ServerName} to lists of {@link HRegionInfo}
* and to check list of region plans.
*
*/
public class BalancerTestBase {
private static Random rand = new Random();
static int regionId = 0;
/**
* Invariant is that all servers have between floor(avg) and ceiling(avg)
* number of regions.
*/
public void assertClusterAsBalanced(List<ServerAndLoad> servers) {
int numServers = servers.size();
int numRegions = 0;
int maxRegions = 0;
int minRegions = Integer.MAX_VALUE;
for (ServerAndLoad server : servers) {
int nr = server.getLoad();
if (nr > maxRegions) {
maxRegions = nr;
}
if (nr < minRegions) {
minRegions = nr;
}
numRegions += nr;
}
if (maxRegions - minRegions < 2) {
// less than 2 between max and min, can't balance
return;
}
int min = numRegions / numServers;
int max = numRegions % numServers == 0 ? min : min + 1;
for (ServerAndLoad server : servers) {
assertTrue(server.getLoad() >= 0);
assertTrue(server.getLoad() <= max);
assertTrue(server.getLoad() >= min);
}
}
protected String printStats(List<ServerAndLoad> servers) {
int numServers = servers.size();
int totalRegions = 0;
for (ServerAndLoad server : servers) {
totalRegions += server.getLoad();
}
float average = (float) totalRegions / numServers;
int max = (int) Math.ceil(average);
int min = (int) Math.floor(average);
return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max
+ " min=" + min + "]";
}
protected List<ServerAndLoad> convertToList(final Map<ServerName, List<HRegionInfo>> servers) {
List<ServerAndLoad> list = new ArrayList<ServerAndLoad>(servers.size());
for (Map.Entry<ServerName, List<HRegionInfo>> e : servers.entrySet()) {
list.add(new ServerAndLoad(e.getKey(), e.getValue().size()));
}
return list;
}
protected String printMock(List<ServerAndLoad> balancedCluster) {
SortedSet<ServerAndLoad> sorted = new TreeSet<ServerAndLoad>(balancedCluster);
ServerAndLoad[] arr = sorted.toArray(new ServerAndLoad[sorted.size()]);
StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
sb.append("{ ");
for (int i = 0; i < arr.length; i++) {
if (i != 0) {
sb.append(" , ");
}
sb.append(arr[i].getServerName().getHostname());
sb.append(":");
sb.append(arr[i].getLoad());
}
sb.append(" }");
return sb.toString();
}
/**
* This assumes the RegionPlan HSI instances are the same ones in the map, so
* actually no need to even pass in the map, but I think it's clearer.
*
* @param list
* @param plans
* @return
*/
protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list, List<RegionPlan> plans) {
List<ServerAndLoad> result = new ArrayList<ServerAndLoad>(list.size());
if (plans == null) return result;
Map<ServerName, ServerAndLoad> map = new HashMap<ServerName, ServerAndLoad>(list.size());
for (ServerAndLoad sl : list) {
map.put(sl.getServerName(), sl);
}
for (RegionPlan plan : plans) {
ServerName source = plan.getSource();
updateLoad(map, source, -1);
ServerName destination = plan.getDestination();
updateLoad(map, destination, +1);
}
result.clear();
result.addAll(map.values());
return result;
}
protected void updateLoad(final Map<ServerName, ServerAndLoad> map,
final ServerName sn,
final int diff) {
ServerAndLoad sal = map.get(sn);
if (sal == null) sal = new ServerAndLoad(sn, 0);
sal = new ServerAndLoad(sn, sal.getLoad() + diff);
map.put(sn, sal);
}
protected Map<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster) {
int numServers = mockCluster.length;
Map<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
for (int i = 0; i < numServers; i++) {
int numRegions = mockCluster[i];
ServerAndLoad sal = randomServer(0);
List<HRegionInfo> regions = randomRegions(numRegions);
servers.put(sal.getServerName(), regions);
}
return servers;
}
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
protected List<HRegionInfo> randomRegions(int numRegions) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
byte[] start = new byte[16];
byte[] end = new byte[16];
rand.nextBytes(start);
rand.nextBytes(end);
for (int i = 0; i < numRegions; i++) {
if (!regionQueue.isEmpty()) {
regions.add(regionQueue.poll());
continue;
}
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table" + i), start, end, false, regionId++);
regions.add(hri);
}
return regions;
}
protected void returnRegions(List<HRegionInfo> regions) {
regionQueue.addAll(regions);
}
private Queue<ServerName> serverQueue = new LinkedList<ServerName>();
protected ServerAndLoad randomServer(final int numRegionsPerServer) {
if (!this.serverQueue.isEmpty()) {
ServerName sn = this.serverQueue.poll();
return new ServerAndLoad(sn, numRegionsPerServer);
}
String host = "srv" + rand.nextInt(100000);
int port = rand.nextInt(60000);
long startCode = rand.nextLong();
ServerName sn = new ServerName(host, port, startCode);
return new ServerAndLoad(sn, numRegionsPerServer);
}
protected List<ServerAndLoad> randomServers(int numServers, int numRegionsPerServer) {
List<ServerAndLoad> servers = new ArrayList<ServerAndLoad>(numServers);
for (int i = 0; i < numServers; i++) {
servers.add(randomServer(numRegionsPerServer));
}
return servers;
}
protected void returnServer(ServerName server) {
serverQueue.add(server);
}
protected void returnServers(List<ServerName> servers) {
this.serverQueue.addAll(servers);
}
}

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestBaseLoadBalancer extends BalancerTestBase {
private static LoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
int[][] regionsAndServersMocks = new int[][] {
// { num regions, num servers }
new int[] { 0, 0 }, new int[] { 0, 1 }, new int[] { 1, 1 }, new int[] { 2, 1 },
new int[] { 10, 1 }, new int[] { 1, 2 }, new int[] { 2, 2 }, new int[] { 3, 2 },
new int[] { 1, 3 }, new int[] { 2, 3 }, new int[] { 3, 3 }, new int[] { 25, 3 },
new int[] { 2, 10 }, new int[] { 2, 100 }, new int[] { 12, 10 }, new int[] { 12, 100 }, };
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
loadBalancer = new MockBalancer();
loadBalancer.setConf(conf);
}
public static class MockBalancer extends BaseLoadBalancer {
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
return null;
}
}
/**
* Tests immediate assignment.
*
* Invariant is that all regions have an assignment.
*
* @throws Exception
*/
@Test
public void testImmediateAssignment() throws Exception {
for (int[] mock : regionsAndServersMocks) {
LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
List<HRegionInfo> regions = randomRegions(mock[0]);
List<ServerAndLoad> servers = randomServers(mock[1], 0);
List<ServerName> list = getListOfServerNames(servers);
Map<HRegionInfo, ServerName> assignments = loadBalancer.immediateAssignment(regions, list);
assertImmediateAssignment(regions, list, assignments);
returnRegions(regions);
returnServers(list);
}
}
/**
* All regions have an assignment.
* @param regions
* @param servers
* @param assignments
*/
private void assertImmediateAssignment(List<HRegionInfo> regions, List<ServerName> servers,
Map<HRegionInfo, ServerName> assignments) {
for (HRegionInfo region : regions) {
assertTrue(assignments.containsKey(region));
}
}
/**
* Tests the bulk assignment used during cluster startup.
*
* Round-robin. Should yield a balanced cluster so same invariant as the load
* balancer holds, all servers holding either floor(avg) or ceiling(avg).
*
* @throws Exception
*/
@Test
public void testBulkAssignment() throws Exception {
for (int[] mock : regionsAndServersMocks) {
LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
List<HRegionInfo> regions = randomRegions(mock[0]);
List<ServerAndLoad> servers = randomServers(mock[1], 0);
List<ServerName> list = getListOfServerNames(servers);
Map<ServerName, List<HRegionInfo>> assignments =
loadBalancer.roundRobinAssignment(regions, list);
float average = (float) regions.size() / servers.size();
int min = (int) Math.floor(average);
int max = (int) Math.ceil(average);
if (assignments != null && !assignments.isEmpty()) {
for (List<HRegionInfo> regionList : assignments.values()) {
assertTrue(regionList.size() == min || regionList.size() == max);
}
}
returnRegions(regions);
returnServers(list);
}
}
/**
* Test the cluster startup bulk assignment which attempts to retain
* assignment info.
* @throws Exception
*/
@Test
public void testRetainAssignment() throws Exception {
// Test simple case where all same servers are there
List<ServerAndLoad> servers = randomServers(10, 10);
List<HRegionInfo> regions = randomRegions(100);
Map<HRegionInfo, ServerName> existing = new TreeMap<HRegionInfo, ServerName>();
for (int i = 0; i < regions.size(); i++) {
ServerName sn = servers.get(i % servers.size()).getServerName();
// The old server would have had same host and port, but different
// start code!
ServerName snWithOldStartCode =
new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() - 10);
existing.put(regions.get(i), snWithOldStartCode);
}
List<ServerName> listOfServerNames = getListOfServerNames(servers);
Map<ServerName, List<HRegionInfo>> assignment =
loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
// Include two new servers that were not there before
List<ServerAndLoad> servers2 = new ArrayList<ServerAndLoad>(servers);
servers2.add(randomServer(10));
servers2.add(randomServer(10));
listOfServerNames = getListOfServerNames(servers2);
assignment = loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
// Remove two of the servers that were previously there
List<ServerAndLoad> servers3 = new ArrayList<ServerAndLoad>(servers);
servers3.remove(0);
servers3.remove(0);
listOfServerNames = getListOfServerNames(servers3);
assignment = loadBalancer.retainAssignment(existing, listOfServerNames);
assertRetainedAssignment(existing, listOfServerNames, assignment);
}
private List<ServerName> getListOfServerNames(final List<ServerAndLoad> sals) {
List<ServerName> list = new ArrayList<ServerName>();
for (ServerAndLoad e : sals) {
list.add(e.getServerName());
}
return list;
}
/**
* Asserts a valid retained assignment plan.
* <p>
* Must meet the following conditions:
* <ul>
* <li>Every input region has an assignment, and to an online server
* <li>If a region had an existing assignment to a server with the same
* address a a currently online server, it will be assigned to it
* </ul>
* @param existing
* @param servers
* @param assignment
*/
private void assertRetainedAssignment(Map<HRegionInfo, ServerName> existing,
List<ServerName> servers, Map<ServerName, List<HRegionInfo>> assignment) {
// Verify condition 1, every region assigned, and to online server
Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers);
Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
assertTrue("Region assigned to server that was not listed as online",
onlineServerSet.contains(a.getKey()));
for (HRegionInfo r : a.getValue())
assignedRegions.add(r);
}
assertEquals(existing.size(), assignedRegions.size());
// Verify condition 2, if server had existing assignment, must have same
Set<String> onlineHostNames = new TreeSet<String>();
for (ServerName s : servers) {
onlineHostNames.add(s.getHostname());
}
for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
ServerName assignedTo = a.getKey();
for (HRegionInfo r : a.getValue()) {
ServerName address = existing.get(r);
if (address != null && onlineHostNames.contains(address.getHostname())) {
// this region was prevously assigned somewhere, and that
// host is still around, then it should be re-assigned on the
// same host
assertEquals(address.getHostname(), assignedTo.getHostname());
}
}
}
}
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the load balancer that is created by default.
*/
@Category(MediumTests.class)
public class TestDefaultLoadBalancer extends BalancerTestBase {
private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class);
private static LoadBalancer loadBalancer;
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.regions.slop", "0");
loadBalancer = new DefaultLoadBalancer();
loadBalancer.setConf(conf);
}
// int[testnum][servernumber] -> numregions
int[][] clusterStateMocks = new int[][] {
// 1 node
new int[] { 0 },
new int[] { 1 },
new int[] { 10 },
// 2 node
new int[] { 0, 0 },
new int[] { 2, 0 },
new int[] { 2, 1 },
new int[] { 2, 2 },
new int[] { 2, 3 },
new int[] { 2, 4 },
new int[] { 1, 1 },
new int[] { 0, 1 },
new int[] { 10, 1 },
new int[] { 14, 1432 },
new int[] { 47, 53 },
// 3 node
new int[] { 0, 1, 2 },
new int[] { 1, 2, 3 },
new int[] { 0, 2, 2 },
new int[] { 0, 3, 0 },
new int[] { 0, 4, 0 },
new int[] { 20, 20, 0 },
// 4 node
new int[] { 0, 1, 2, 3 },
new int[] { 4, 0, 0, 0 },
new int[] { 5, 0, 0, 0 },
new int[] { 6, 6, 0, 0 },
new int[] { 6, 2, 0, 0 },
new int[] { 6, 1, 0, 0 },
new int[] { 6, 0, 0, 0 },
new int[] { 4, 4, 4, 7 },
new int[] { 4, 4, 4, 8 },
new int[] { 0, 0, 0, 7 },
// 5 node
new int[] { 1, 1, 1, 1, 4 },
// more nodes
new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 },
new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, new int[] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 },
new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 },
new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 },
new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 },
new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 },
new int[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 },
new int[] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 },
new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } };
/**
* Test the load balancing algorithm.
*
* Invariant is that all servers should be hosting either floor(average) or
* ceiling(average)
*
* @throws Exception
*/
@Test
public void testBalanceCluster() throws Exception {
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
List<ServerAndLoad> balancedCluster = reconcile(list, plans);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
}
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestStochasticLoadBalancer extends BalancerTestBase {
private static StochasticLoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf);
}
// int[testnum][servernumber] -> numregions
int[][] clusterStateMocks = new int[][]{
// 1 node
new int[]{0},
new int[]{1},
new int[]{10},
// 2 node
new int[]{0, 0},
new int[]{2, 0},
new int[]{2, 1},
new int[]{2, 2},
new int[]{2, 3},
new int[]{2, 4},
new int[]{1, 1},
new int[]{0, 1},
new int[]{10, 1},
new int[]{514, 1432},
new int[]{47, 53},
// 3 node
new int[]{0, 1, 2},
new int[]{1, 2, 3},
new int[]{0, 2, 2},
new int[]{0, 3, 0},
new int[]{0, 4, 0},
new int[]{20, 20, 0},
// 4 node
new int[]{0, 1, 2, 3},
new int[]{4, 0, 0, 0},
new int[]{5, 0, 0, 0},
new int[]{6, 6, 0, 0},
new int[]{6, 2, 0, 0},
new int[]{6, 1, 0, 0},
new int[]{6, 0, 0, 0},
new int[]{4, 4, 4, 7},
new int[]{4, 4, 4, 8},
new int[]{0, 0, 0, 7},
// 5 node
new int[]{1, 1, 1, 1, 4},
// more nodes
new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 10},
new int[]{6, 6, 5, 6, 6, 6, 6, 6, 6, 1},
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 54},
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 55},
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 56},
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 16},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 8},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 9},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
};
/**
* Test the load balancing algorithm.
*
* Invariant is that all servers should be hosting either floor(average) or
* ceiling(average)
*
* @throws Exception
*/
@Test
public void testBalanceCluster() throws Exception {
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
List<ServerAndLoad> balancedCluster = reconcile(list, plans);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
}
}
}
@Test
public void testSkewCost() {
for (int[] mockCluster : clusterStateMocks) {
double cost = loadBalancer.computeSkewLoadCost(mockClusterServers(mockCluster));
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
assertEquals(1,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 0, 1 })), 0.01);
assertEquals(.75,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 1, 1 })), 0.01);
assertEquals(.5,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 1, 1, 1 })), 0.01);
assertEquals(.25,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 1, 1, 1, 1 })), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 1, 1, 1, 1, 1 })), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 10, 10, 10, 10, 10 })), 0.01);
}
@Test
public void testTableSkewCost() {
for (int[] mockCluster : clusterStateMocks) {
double cost = loadBalancer.computeTableSkewLoadCost(mockClusterServers(mockCluster));
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
}
@Test
public void testCostFromStats() {
DescriptiveStatistics statOne = new DescriptiveStatistics();
for (int i =0; i < 100; i++) {
statOne.addValue(10);
}
assertEquals(0, loadBalancer.costFromStats(statOne), 0.01);
DescriptiveStatistics statTwo = new DescriptiveStatistics();
for (int i =0; i < 100; i++) {
statTwo.addValue(0);
}
statTwo.addValue(100);
assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01);
DescriptiveStatistics statThree = new DescriptiveStatistics();
for (int i =0; i < 100; i++) {
statThree.addValue(0);
statThree.addValue(100);
}
assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01);
}
}

11
pom.xml
View File

@ -794,6 +794,11 @@
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>${commons-math.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@ -1012,12 +1017,6 @@
<version>${mockito-all.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>${commons-math.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>