diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 0b539968abe..6c019041d32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -3916,7 +3917,8 @@ public class AssignmentManager extends ZooKeeperListener { return this.balancer; } - public Map> getSnapShotOfAssignment(List infos) { + public Map> + getSnapShotOfAssignment(Collection infos) { return getRegionStates().getRegionAssignments(infos); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 0f6737b6eeb..0b2e2f07f2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -41,6 +41,9 @@ public class RackManager { private DNSToSwitchMapping switchMapping; + public RackManager() { + } + public RackManager(Configuration conf) { switchMapping = ReflectionUtils.instantiateWithCustomCtor( conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 0400e197e22..549265f0ba7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -153,7 +154,8 @@ public class RegionStates { * @param regions * @return a pair containing the groupings as a map */ - synchronized Map> getRegionAssignments(List regions) { + synchronized Map> getRegionAssignments( + Collection regions) { Map> map = new HashMap>(); for (HRegionInfo region : regions) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); @@ -900,6 +902,19 @@ public class RegionStates { return getRegionState(hri.getEncodedName()); } + /** + * Returns a clone of region assignments per server + * @return a Map of ServerName to a List of HRegionInfo's + */ + protected synchronized Map> getRegionAssignmentsByServer() { + Map> regionsByServer = + new HashMap>(serverHoldings.size()); + for (Map.Entry> e: serverHoldings.entrySet()) { + regionsByServer.put(e.getKey(), new ArrayList(e.getValue())); + } + return regionsByServer; + } + protected synchronized RegionState getRegionState(final String encodedName) { return regionStates.get(encodedName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 1c3c647205c..4053d6af1fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -34,6 +34,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; +import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,9 +49,13 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** @@ -63,94 +68,184 @@ public abstract class BaseLoadBalancer implements LoadBalancer { private static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; + private static final List EMPTY_REGION_LIST = new ArrayList(0); + + protected final RegionLocationFinder regionFinder = new RegionLocationFinder(); + + private static class DefaultRackManager extends RackManager { + @Override + public String getRack(ServerName server) { + return UNKNOWN_RACK; + } + } + /** * An efficient array based implementation similar to ClusterState for keeping * the status of the cluster in terms of region assignment and distribution. - * To be used by LoadBalancers. + * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of + * hundreds of thousands of hashmap manipulations are very costly, which is why this + * class uses mostly indexes and arrays. + * + * Cluster tracks a list of unassigned regions, region assignments, and the server + * topology in terms of server names, hostnames and racks. */ protected static class Cluster { ServerName masterServerName; Set tablesOnMaster; ServerName[] servers; + String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host + String[] racks; + boolean multiServersPerHost = false; // whether or not any host has more than one server + ArrayList tables; HRegionInfo[] regions; Deque[] regionLoads; boolean[] backupMasterFlags; int activeMasterIndex = -1; + int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality + int[] serverIndexToHostIndex; //serverIndex -> host index + int[] serverIndexToRackIndex; //serverIndex -> rack index + int[][] regionsPerServer; //serverIndex -> region list + int[][] regionsPerHost; //hostIndex -> list of regions + int[][] regionsPerRack; //rackIndex -> region list + int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index + int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index + int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index + + int[][] serversPerHost; //hostIndex -> list of server indexes + int[][] serversPerRack; //rackIndex -> list of server indexes int[] regionIndexToServerIndex; //regionIndex -> serverIndex int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) int[] regionIndexToTableIndex; //regionIndex -> tableIndex int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS int numUserRegionsOnMaster; //number of user regions on the active master + int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary + boolean hasRegionReplicas = false; //whether there is regions with replicas Integer[] serverIndicesSortedByRegionCount; Map serversToIndex; + Map hostsToIndex; + Map racksToIndex; Map tablesToIndex; + Map regionsToIndex; - int numRegions; int numServers; + int numHosts; + int numRacks; int numTables; + int numRegions; int numMovedRegions = 0; //num moved regions from the initial configuration // num of moved regions away from master that should be on the master int numMovedMasterHostedRegions = 0; - @SuppressWarnings("unchecked") - protected Cluster(ServerName masterServerName, + protected final RackManager rackManager; + + protected Cluster( + ServerName masterServerName, Map> clusterState, Map> loads, RegionLocationFinder regionFinder, Collection backupMasters, - Set tablesOnMaster) { + Set tablesOnMaster, + RackManager rackManager) { + this(masterServerName, null, clusterState, loads, regionFinder, backupMasters, + tablesOnMaster, rackManager); + } + + protected Cluster( + ServerName masterServerName, + Collection unassignedRegions, + Map> clusterState, + Map> loads, + RegionLocationFinder regionFinder, + Collection backupMasters, + Set tablesOnMaster, + RackManager rackManager) { + + if (unassignedRegions == null) { + unassignedRegions = EMPTY_REGION_LIST; + } - this.tablesOnMaster = tablesOnMaster; this.masterServerName = masterServerName; + this.tablesOnMaster = tablesOnMaster; + serversToIndex = new HashMap(); + hostsToIndex = new HashMap(); + racksToIndex = new HashMap(); tablesToIndex = new HashMap(); - //regionsToIndex = new HashMap(); //TODO: We should get the list of tables from master tables = new ArrayList(); + this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); numRegions = 0; - int serverIndex = 0; + List> serversPerHostList = new ArrayList>(); + List> serversPerRackList = new ArrayList>(); // Use servername and port as there can be dead servers in this list. We want everything with // a matching hostname and port to have the same index. - for (ServerName sn:clusterState.keySet()) { + for (ServerName sn : clusterState.keySet()) { if (serversToIndex.get(sn.getHostAndPort()) == null) { - serversToIndex.put(sn.getHostAndPort(), serverIndex++); + serversToIndex.put(sn.getHostAndPort(), numServers++); } + if (!hostsToIndex.containsKey(sn.getHostname())) { + hostsToIndex.put(sn.getHostname(), numHosts++); + serversPerHostList.add(new ArrayList(1)); + } + + int serverIndex = serversToIndex.get(sn.getHostAndPort()); + int hostIndex = hostsToIndex.get(sn.getHostname()); + serversPerHostList.get(hostIndex).add(serverIndex); + + String rack = this.rackManager.getRack(sn); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList()); + } + int rackIndex = racksToIndex.get(rack); + serversPerRackList.get(rackIndex).add(serverIndex); } // Count how many regions there are. for (Entry> entry : clusterState.entrySet()) { numRegions += entry.getValue().size(); } + numRegions += unassignedRegions.size(); - numServers = serversToIndex.size(); - regionsPerServer = new int[serversToIndex.size()][]; - + regionsToIndex = new HashMap(numRegions); servers = new ServerName[numServers]; + serversPerHost = new int[numHosts][]; + serversPerRack = new int[numRacks][]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; initialRegionIndexToServerIndex = new int[numRegions]; regionIndexToTableIndex = new int[numRegions]; + regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; backupMasterFlags = new boolean[numServers]; + serverIndexToHostIndex = new int[numServers]; + serverIndexToRackIndex = new int[numServers]; + regionsPerServer = new int[numServers][]; + regionsPerHost = new int[numHosts][]; + regionsPerRack = new int[numRacks][]; + primariesOfRegionsPerServer = new int[numServers][]; + primariesOfRegionsPerHost = new int[numHosts][]; + primariesOfRegionsPerRack = new int[numRacks][]; + int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; for (Entry> entry : clusterState.entrySet()) { - serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); + int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); // keep the servername if this is the first server name for this hostname // or this servername has the newest startcode. @@ -168,6 +263,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } else { regionsPerServer[serverIndex] = new int[entry.getValue().size()]; } + primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; serverIndicesSortedByRegionCount[serverIndex] = serverIndex; if (servers[serverIndex].equals(masterServerName)) { @@ -180,50 +276,53 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + hosts = new String[numHosts]; + for (Entry entry : hostsToIndex.entrySet()) { + hosts[entry.getValue()] = entry.getKey(); + } + racks = new String[numRacks]; + for (Entry entry : racksToIndex.entrySet()) { + racks[entry.getValue()] = entry.getKey(); + } + for (Entry> entry : clusterState.entrySet()) { - serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); + int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); regionPerServerIndex = 0; + int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); + serverIndexToHostIndex[serverIndex] = hostIndex; + + int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + serverIndexToRackIndex[serverIndex] = rackIndex; + for (HRegionInfo region : entry.getValue()) { - String tableName = region.getTable().getNameAsString(); - Integer idx = tablesToIndex.get(tableName); - if (idx == null) { - tables.add(tableName); - idx = tableIndex; - tablesToIndex.put(tableName, tableIndex++); - } + registerRegion(region, regionIndex, serverIndex, loads, regionFinder); - regions[regionIndex] = region; - regionIndexToServerIndex[regionIndex] = serverIndex; - initialRegionIndexToServerIndex[regionIndex] = serverIndex; - regionIndexToTableIndex[regionIndex] = idx; regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; - - // region load - if (loads != null) { - Deque rl = loads.get(region.getRegionNameAsString()); - // That could have failed if the RegionLoad is using the other regionName - if (rl == null) { - // Try getting the region load using encoded name. - rl = loads.get(region.getEncodedName()); - } - regionLoads[regionIndex] = rl; - } - - if (regionFinder != null) { - //region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i=0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = - loc.get(i) == null ? -1 : - (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort())); - } - } - regionIndex++; } } + for (HRegionInfo region : unassignedRegions) { + registerRegion(region, regionIndex, -1, loads, regionFinder); + regionIndex++; + } + + for (int i = 0; i < serversPerHostList.size(); i++) { + serversPerHost[i] = new int[serversPerHostList.get(i).size()]; + for (int j = 0; j < serversPerHost[i].length; j++) { + serversPerHost[i][j] = serversPerHostList.get(i).get(j); + } + if (serversPerHost[i].length > 1) { + multiServersPerHost = true; + } + } + + for (int i = 0; i < serversPerRackList.size(); i++) { + serversPerRack[i] = new int[serversPerRackList.get(i).size()]; + for (int j = 0; j < serversPerRack[i].length; j++) { + serversPerRack[i][j] = serversPerRackList.get(i).get(j); + } + } numTables = tables.size(); numRegionsPerServerPerTable = new int[numServers][numTables]; @@ -235,76 +334,339 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } for (int i=0; i < regionIndexToServerIndex.length; i++) { - numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + if (regionIndexToServerIndex[i] >= 0) { + numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + } } numMaxRegionsPerTable = new int[numTables]; - for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { + for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) { if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex]; } } } - } - public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) { - if (servers[lServer].equals(masterServerName)) { - if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) { - numUserRegionsOnMaster--; - } - if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) { - numUserRegionsOnMaster++; - } - } else if (servers[rServer].equals(masterServerName)) { - if (lRegion >= 0 && !shouldBeOnMaster(regions[lRegion])) { - numUserRegionsOnMaster++; - } - if (rRegion >= 0 && !shouldBeOnMaster(regions[rRegion])) { - numUserRegionsOnMaster--; + for (int i = 0; i < regions.length; i ++) { + HRegionInfo info = regions[i]; + if (RegionReplicaUtil.isDefaultReplica(info)) { + regionIndexToPrimaryIndex[i] = i; + } else { + hasRegionReplicas = true; + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + regionIndexToPrimaryIndex[i] = + regionsToIndex.containsKey(primaryInfo) ? + regionsToIndex.get(primaryInfo): + -1; } } - //swap - if (rRegion >= 0 && lRegion >= 0) { - regionMoved(rRegion, rServer, lServer); - regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion); - regionMoved(lRegion, lServer, rServer); - regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion); - } else if (rRegion >= 0) { //move rRegion - regionMoved(rRegion, rServer, lServer); - regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion); - regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion); - } else if (lRegion >= 0) { //move lRegion - regionMoved(lRegion, lServer, rServer); - regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion); - regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion); + + for (int i = 0; i < regionsPerServer.length; i++) { + primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; + for (int j = 0; j < regionsPerServer[i].length; j++) { + int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; + primariesOfRegionsPerServer[i][j] = primaryIndex; + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerServer[i]); + } + + // compute regionsPerHost + if (multiServersPerHost) { + for (int i = 0 ; i < serversPerHost.length; i++) { + int numRegionsPerHost = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; + } + regionsPerHost[i] = new int[numRegionsPerHost]; + primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; + } + for (int i = 0 ; i < serversPerHost.length; i++) { + int numRegionPerHostIndex = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { + int region = regionsPerServer[serversPerHost[i][j]][k]; + regionsPerHost[i][numRegionPerHostIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; + numRegionPerHostIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerHost[i]); + } + } + + // compute regionsPerRack + if (numRacks > 1) { + for (int i = 0 ; i < serversPerRack.length; i++) { + int numRegionsPerRack = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; + } + regionsPerRack[i] = new int[numRegionsPerRack]; + primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; + } + + for (int i = 0 ; i < serversPerRack.length; i++) { + int numRegionPerRackIndex = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { + int region = regionsPerServer[serversPerRack[i][j]][k]; + regionsPerRack[i][numRegionPerRackIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; + numRegionPerRackIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerRack[i]); + } } } - /** Region moved out of the server */ - void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) { - regionIndexToServerIndex[regionIndex] = newServerIndex; - if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) { + /** Helper for Cluster constructor to handle a region */ + private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, + Map> loads, RegionLocationFinder regionFinder) { + String tableName = region.getTable().getNameAsString(); + if (!tablesToIndex.containsKey(tableName)) { + tables.add(tableName); + tablesToIndex.put(tableName, tablesToIndex.size()); + } + int tableIndex = tablesToIndex.get(tableName); + + regionsToIndex.put(region, regionIndex); + regions[regionIndex] = region; + regionIndexToServerIndex[regionIndex] = serverIndex; + initialRegionIndexToServerIndex[regionIndex] = serverIndex; + regionIndexToTableIndex[regionIndex] = tableIndex; + + // region load + if (loads != null) { + Deque rl = loads.get(region.getRegionNameAsString()); + // That could have failed if the RegionLoad is using the other regionName + if (rl == null) { + // Try getting the region load using encoded name. + rl = loads.get(region.getEncodedName()); + } + regionLoads[regionIndex] = rl; + } + + if (regionFinder != null) { + //region location + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i=0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = + loc.get(i) == null ? -1 : + (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } + } + } + + /** An action to move or swap a region */ + public static class Action { + public static enum Type { + ASSIGN_REGION, + MOVE_REGION, + SWAP_REGIONS, + NULL, + } + + public Type type; + public Action (Type type) {this.type = type;} + /** Returns an Action which would undo this action */ + public Action undoAction() { return this; } + @Override + public String toString() { return type + ":";} + } + + public static class AssignRegionAction extends Action { + public int region; + public int server; + public AssignRegionAction(int region, int server) { + super(Type.ASSIGN_REGION); + this.region = region; + this.server = server; + } + @Override + public Action undoAction() { + // TODO implement this. This action is not being used by the StochasticLB for now + // in case it uses it, we should implement this function. + throw new NotImplementedException(); + } + @Override + public String toString() { + return type + ": " + region + ":" + server; + } + } + + public static class MoveRegionAction extends Action { + public int region; + public int fromServer; + public int toServer; + + public MoveRegionAction(int region, int fromServer, int toServer) { + super(Type.MOVE_REGION); + this.fromServer = fromServer; + this.region = region; + this.toServer = toServer; + } + @Override + public Action undoAction() { + return new MoveRegionAction (region, toServer, fromServer); + } + @Override + public String toString() { + return type + ": " + region + ":" + fromServer + " -> " + toServer; + } + } + + public static class SwapRegionsAction extends Action { + public int fromServer; + public int fromRegion; + public int toServer; + public int toRegion; + public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { + super(Type.SWAP_REGIONS); + this.fromServer = fromServer; + this.fromRegion = fromRegion; + this.toServer = toServer; + this.toRegion = toRegion; + } + @Override + public Action undoAction() { + return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); + } + @Override + public String toString() { + return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; + } + } + + public static Action NullAction = new Action(Type.NULL); + + public void doAction(Action action) { + switch (action.type) { + case NULL: break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); + regionMoved(ar.region, -1, ar.server); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); + regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); + regionMoved(mra.region, mra.fromServer, mra.toServer); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); + regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); + regionMoved(a.fromRegion, a.fromServer, a.toServer); + regionMoved(a.toRegion, a.toServer, a.fromServer); + break; + default: + throw new RuntimeException("Uknown action:" + action.type); + } + } + + /** + * Return true if the placement of region on server would lower the availability + * of the region in question + * @param server + * @param region + * @return true or false + */ + boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getHostAndPort())) { + return false; // safeguard against race between cluster.servers and servers from LB method args + } + int server = serversToIndex.get(serverName.getHostAndPort()); + int region = regionsToIndex.get(regionInfo); + + int primary = regionIndexToPrimaryIndex[region]; + + // there is a subset relation for server < host < rack + // check server first + + if (contains(primariesOfRegionsPerServer[server], primary)) { + // check for whether there are other servers that we can place this region + for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { + if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { + return true; // meaning there is a better server + } + } + return false; // there is not a better server to place this + } + + // check host + if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host + int host = serverIndexToHostIndex[server]; + if (contains(primariesOfRegionsPerHost[host], primary)) { + // check for whether there are other hosts that we can place this region + for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { + if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { + return true; // meaning there is a better host + } + } + return false; // there is not a better host to place this + } + } + + // check rack + if (numRacks > 1) { + int rack = serverIndexToRackIndex[server]; + if (contains(primariesOfRegionsPerRack[rack], primary)) { + // check for whether there are other racks that we can place this region + for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { + if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { + return true; // meaning there is a better rack + } + } + return false; // there is not a better rack to place this + } + } + return false; + } + + void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getHostAndPort())) { + return; + } + int server = serversToIndex.get(serverName.getHostAndPort()); + int region = regionsToIndex.get(regionInfo); + doAction(new AssignRegionAction(region, server)); + } + + void regionMoved(int region, int oldServer, int newServer) { + regionIndexToServerIndex[region] = newServer; + if (initialRegionIndexToServerIndex[region] == newServer) { numMovedRegions--; //region moved back to original location - if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(newServerIndex)) { - // Master hosted region moved back to the active master + if (shouldBeOnMaster(regions[region]) && isActiveMaster(newServer)) { + //Master hosted region moved back to the active master numMovedMasterHostedRegions--; } - } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) { + } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { numMovedRegions++; //region moved from original location - if (shouldBeOnMaster(regions[regionIndex]) && isActiveMaster(oldServerIndex)) { + if (shouldBeOnMaster(regions[region]) && isActiveMaster(oldServer)) { // Master hosted region moved away from active the master numMovedMasterHostedRegions++; } } - int tableIndex = regionIndexToTableIndex[regionIndex]; - numRegionsPerServerPerTable[oldServerIndex][tableIndex]--; - numRegionsPerServerPerTable[newServerIndex][tableIndex]++; + int tableIndex = regionIndexToTableIndex[region]; + if (oldServer >= 0) { + numRegionsPerServerPerTable[oldServer][tableIndex]--; + } + numRegionsPerServerPerTable[newServer][tableIndex]++; //check whether this caused maxRegionsPerTable in the new Server to be updated - if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex]; - } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1) + if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex]; + } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) == numMaxRegionsPerTable[tableIndex]) { //recompute maxRegionsPerTable since the previous value was coming from the old server for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { @@ -313,6 +675,54 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } } + + // update for servers + int primary = regionIndexToPrimaryIndex[region]; + if (oldServer >= 0) { + primariesOfRegionsPerServer[oldServer] = removeRegion( + primariesOfRegionsPerServer[oldServer], primary); + } + primariesOfRegionsPerServer[newServer] = addRegionSorted( + primariesOfRegionsPerServer[newServer], primary); + + // update for hosts + if (multiServersPerHost) { + int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; + int newHost = serverIndexToHostIndex[newServer]; + if (newHost != oldHost) { + regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); + primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); + if (oldHost >= 0) { + regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); + primariesOfRegionsPerHost[oldHost] = removeRegion( + primariesOfRegionsPerHost[oldHost], primary); // will still be sorted + } + } + } + + // update for racks + if (numRacks > 1) { + int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; + int newRack = serverIndexToRackIndex[newServer]; + if (newRack != oldRack) { + regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); + primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); + if (oldRack >= 0) { + regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); + primariesOfRegionsPerRack[oldRack] = removeRegion( + primariesOfRegionsPerRack[oldRack], primary); // will still be sorted + } + } + } + if (oldServer >= 0 && isActiveMaster(oldServer)) { + if (!shouldBeOnMaster(regions[region])) { + numUserRegionsOnMaster--; + } + } else if (isActiveMaster(newServer)) { + if (!shouldBeOnMaster(regions[region])) { + numUserRegionsOnMaster++; + } + } } int[] removeRegion(int[] regions, int regionIndex) { @@ -336,6 +746,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return newRegions; } + int[] addRegionSorted(int[] regions, int regionIndex) { + int[] newRegions = new int[regions.length + 1]; + int i = 0; + for (i = 0; i < regions.length; i++) { // find the index to insert + if (regions[i] > regionIndex) { + break; + } + } + System.arraycopy(regions, 0, newRegions, 0, i); // copy first half + System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half + newRegions[i] = regionIndex; + + return newRegions; + } + int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { int i = 0; for (i = 0; i < regions.length; i++) { @@ -368,6 +793,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { region.getTable().getNameAsString()); } + boolean contains(int[] arr, int val) { + return Arrays.binarySearch(arr, val) >= 0; + } + private Comparator numRegionsComparator = new Comparator() { @Override public int compare(Integer integer, Integer integer2) { @@ -411,6 +840,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // slop for regions protected float slop; protected Configuration config; + protected RackManager rackManager; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); @@ -480,6 +910,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { tablesOnMaster.add(table); } } + this.rackManager = new RackManager(getConf()); + regionFinder.setConf(conf); } protected void setSlop(Configuration conf) { @@ -580,6 +1012,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // Assume there won't be too much backup masters // re/starting, so this won't leak much memory. excludedServers.addAll(st.getBackupMasters()); + regionFinder.setClusterStatus(st); } @Override @@ -587,6 +1020,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { masterServerName = masterServices.getServerName(); excludedServers.remove(masterServerName); this.services = masterServices; + this.regionFinder.setServices(masterServices); + } + + public void setRackManager(RackManager rackManager) { + this.rackManager = rackManager; } protected Collection getBackupMasters() { @@ -601,6 +1039,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } return false; } + // TODO: check for co-located region replicas as well + // Check if we even need to do any load balancing // HBASE-3681 check sloppiness first float average = cs.getLoadAverage(); // for logging @@ -653,6 +1093,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { LOG.warn("Wanted to do round robin assignment but no servers to assign to"); return null; } + + // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the + // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate + // generator for AssignRegionAction. The LB will ensure the regions are mostly local + // and balanced. This should also run fast with fewer number of iterations. + Map> assignments = new TreeMap>(); if (numServers + numBackupMasters == 1) { // Only one server, nothing fancy we can do here ServerName server = numServers > 0 ? servers.get(0) : backupMasters.get(0); @@ -668,6 +1114,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { numServers = 0; } } + + Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster); + List unassignedRegions = new ArrayList(); + int total = regions.size(); // Get the number of regions to be assigned // to backup masters based on the weight @@ -675,21 +1125,87 @@ public abstract class BaseLoadBalancer implements LoadBalancer { / (numServers * backupMasterWeight + numBackupMasters); if (numRegions > 0) { // backupMasters can't be null, according to the formula, numBackupMasters != 0 - roundRobinAssignment(regions, 0, + roundRobinAssignment(cluster, regions, unassignedRegions, 0, numRegions, backupMasters, masterRegions, assignments); } int remainder = total - numRegions; if (remainder > 0) { // servers can't be null, or contains the master only since numServers != 0 - roundRobinAssignment(regions, numRegions, remainder, + roundRobinAssignment(cluster, regions, unassignedRegions, numRegions, remainder, servers, masterRegions, assignments); } if (masterRegions != null && !masterRegions.isEmpty()) { assignments.put(masterServerName, masterRegions); + for (HRegionInfo r : masterRegions) { + cluster.doAssignRegion(r, masterServerName); + } + } + List lastFewRegions = new ArrayList(); + // assign the remaining by going through the list and try to assign to servers one-by-one + int serverIdx = RANDOM.nextInt(numServers); + for (HRegionInfo region : unassignedRegions) { + for (int j = 0; j < numServers; j++) { // try all servers one by one + ServerName serverName = servers.get((j + serverIdx) % numServers); + if (serverName.equals(masterServerName)) { + continue; + } + if (!cluster.wouldLowerAvailability(region, serverName)) { + List serverRegions = assignments.get(serverName); + if (serverRegions == null) { + serverRegions = new ArrayList(); + assignments.put(serverName, serverRegions); + } + serverRegions.add(region); + cluster.doAssignRegion(region, serverName); + serverIdx = (j + serverIdx + 1) % numServers; //remain from next server + break; + } else { + lastFewRegions.add(region); + } + } + } + // just sprinkle the rest of the regions on random regionservers. The balanceCluster will + // make it optimal later. we can end up with this if numReplicas > numServers. + for (HRegionInfo region : lastFewRegions) { + ServerName server = null; + if (numServers == 0) { + // select from backup masters + int i = RANDOM.nextInt(backupMasters.size()); + server = backupMasters.get(i); + } else { + do { + int i = RANDOM.nextInt(numServers); + server = servers.get(i); + } while (numServers > 1 && server.equals(masterServerName)); + } + List serverRegions = assignments.get(server); + if (serverRegions == null) { + serverRegions = new ArrayList(); + assignments.put(server, serverRegions); + } + serverRegions.add(region); + cluster.doAssignRegion(region, server); } return assignments; } + protected Cluster createCluster(List servers, + Collection regions, List backupMasters, Set tablesOnMaster) { + // Get the snapshot of the current assignments for the regions in question, and then create + // a cluster out of it. Note that we might have replicas already assigned to some servers + // earlier. So we want to get the snapshot to see those assignments, but this will only contain + // replicas of the regions that are passed (for performance). + Map> clusterState = getRegionAssignmentsByServer(regions); + + for (ServerName server : servers) { + if (!clusterState.containsKey(server)) { + clusterState.put(server, EMPTY_REGION_LIST); + } + } + return new Cluster(masterServerName, regions, clusterState, null, this.regionFinder, backupMasters, + tablesOnMaster, rackManager); + } + /** * Generates an immediate assignment plan to be used by a new master for * regions in transition that do not have an already known destination. @@ -717,9 +1233,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } Map assignments = new TreeMap(); - List backupMasters = normalizeServers(servers); for (HRegionInfo region : regions) { - assignments.put(region, randomAssignment(region, servers, backupMasters)); + assignments.put(region, randomAssignment(region, servers)); } return assignments; } @@ -734,8 +1249,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { LOG.warn("Wanted to do random assignment but no servers to assign to"); return null; } - return randomAssignment(regionInfo, servers, - normalizeServers(servers)); + List backupMasters = normalizeServers(servers); + List regions = Lists.newArrayList(regionInfo); + Cluster cluster = createCluster(servers, regions, backupMasters, tablesOnMaster); + + return randomAssignment(cluster, regionInfo, servers, backupMasters); } /** @@ -807,6 +1325,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; + + Cluster cluster = createCluster(servers, regions.keySet(), backupMasters, tablesOnMaster); + for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); ServerName oldServerName = entry.getValue(); @@ -824,28 +1345,34 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } else if (localServers.isEmpty()) { // No servers on the new cluster match up with this hostname, // assign randomly. - ServerName randomServer = randomAssignment(region, servers, backupMasters); + ServerName randomServer = randomAssignment(cluster, region, servers, backupMasters); assignments.get(randomServer).add(region); numRandomAssignments++; if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname()); } else if (localServers.size() == 1) { // the usual case - one new server on same host - assignments.get(localServers.get(0)).add(region); + ServerName target = localServers.get(0); + assignments.get(target).add(region); + cluster.doAssignRegion(region, target); numRetainedAssigments++; } else { // multiple new servers in the cluster on this same host - ServerName target = null; - for (ServerName tmp: localServers) { - if (tmp.getPort() == oldServerName.getPort()) { - target = tmp; - break; + if (localServers.contains(oldServerName)) { + assignments.get(oldServerName).add(region); + cluster.doAssignRegion(region, oldServerName); + } else { + ServerName target = null; + for (ServerName tmp: localServers) { + if (tmp.getPort() == oldServerName.getPort()) { + target = tmp; + break; + } } + if (target == null) { + target = randomAssignment(cluster, region, localServers, backupMasters); + } + assignments.get(target).add(region); } - if (target == null) { - int size = localServers.size(); - target = localServers.get(RANDOM.nextInt(size)); - } - assignments.get(target).add(region); numRetainedAssigments++; } } @@ -924,7 +1451,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * only backup masters that are intended to host this region, i.e, it * may not have all the backup masters. */ - private ServerName randomAssignment(HRegionInfo regionInfo, + private ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo, List servers, List backupMasters) { int numServers = servers == null ? 0 : servers.size(); int numBackupMasters = backupMasters == null ? 0 : backupMasters.size(); @@ -936,34 +1463,45 @@ public abstract class BaseLoadBalancer implements LoadBalancer { && servers.contains(masterServerName)) { return masterServerName; } - // Generate a random number weighted more towards - // regular regionservers instead of backup masters. - // This formula is chosen for simplicity. - int i = RANDOM.nextInt( - numBackupMasters + numServers * backupMasterWeight); - if (i < numBackupMasters) { - return backupMasters.get(i); - } - i = (i - numBackupMasters)/backupMasterWeight; - ServerName sn = servers.get(i); - if (sn.equals(masterServerName)) { - // Try to avoid master for a user region - if (numServers > 1) { - i = (i == 0 ? 1 : i - 1); - sn = servers.get(i); - } else if (numBackupMasters > 0) { - sn = backupMasters.get(0); + ServerName sn = null; + final int maxIterations = servers.size() * 4; + int iterations = 0; + + do { + // Generate a random number weighted more towards + // regular regionservers instead of backup masters. + // This formula is chosen for simplicity. + int i = RANDOM.nextInt( + numBackupMasters + numServers * backupMasterWeight); + if (i < numBackupMasters) { + sn = backupMasters.get(i); + continue; } - } + i = (i - numBackupMasters)/backupMasterWeight; + sn = servers.get(i); + if (sn.equals(masterServerName)) { + // Try to avoid master for a user region + if (numServers > 1) { + i = (i == 0 ? 1 : i - 1); + sn = servers.get(i); + } else if (numBackupMasters > 0) { + sn = backupMasters.get(0); + } + } + } while (cluster.wouldLowerAvailability(regionInfo, sn) + && iterations++ < maxIterations); + cluster.doAssignRegion(regionInfo, sn); return sn; } /** * Round robin a chunk of a list of regions to a list of servers */ - private void roundRobinAssignment(List regions, int offset, + private void roundRobinAssignment(Cluster cluster, List regions, + List unassignedRegions, int offset, int numRegions, List servers, List masterRegions, Map> assignments) { + boolean masterIncluded = servers.contains(masterServerName); int numServers = servers.size(); int skipServers = numServers; @@ -971,8 +1509,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { skipServers--; } int max = (int) Math.ceil((float) numRegions / skipServers); - int serverIdx = RANDOM.nextInt(numServers); + int serverIdx = 0; + if (numServers > 1) { + serverIdx = RANDOM.nextInt(numServers); + } int regionIdx = 0; + for (int j = 0; j < numServers; j++) { ServerName server = servers.get((j + serverIdx) % numServers); if (masterIncluded && server.equals(masterServerName)) { @@ -984,7 +1526,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { for (int i = regionIdx; i < numRegions; i += skipServers) { HRegionInfo region = regions.get(offset + i % numRegions); if (masterRegions == null || !shouldBeOnMaster(region)) { - serverRegions.add(region); + if (cluster.wouldLowerAvailability(region, server)) { + unassignedRegions.add(region); + } else { + serverRegions.add(region); + cluster.doAssignRegion(region, server); + } continue; } // Master is in the list and this is a special region @@ -994,4 +1541,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIdx++; } } + + protected Map> getRegionAssignmentsByServer( + Collection regions) { + if (this.services != null && this.services.getAssignmentManager() != null) { + return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); + } else { + return new HashMap>(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java index a2730c5cafa..9cf995af0d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java @@ -62,6 +62,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { @Override public void setConf(Configuration conf) { + super.setConf(conf); globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); this.rackManager = new RackManager(conf); super.setConf(conf); @@ -80,7 +81,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { LOG.warn("Not running balancer since exception was thrown " + ie); return plans; } - globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); + globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); Map serverNameToServerNameWithoutCode = new HashMap(); Map serverNameWithoutCodeToServerName = @@ -133,7 +134,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2)); } } - + if (destination != null) { RegionPlan plan = new RegionPlan(region, currentServer, destination); plans.add(plan); @@ -159,7 +160,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { // one of the favored node is still alive. In this case, try to adhere // to the current favored nodes assignment as much as possible - i.e., // if the current primary is gone, then make the secondary or tertiary - // as the new host for the region (based on their current load). + // as the new host for the region (based on their current load). // Note that we don't change the favored // node assignments here (even though one or more favored node is currently // down). It is up to the balanceCluster to do this hard work. The HDFS @@ -222,7 +223,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { } } - private Pair>, List> + private Pair>, List> segregateRegionsAndAssignRegionsWithFavoredNodes(List regions, List availableServers) { Map> assignmentMapForFavoredNodes = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index 690d8c9e3c7..3da4110c54d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -147,7 +147,7 @@ class RegionLocationFinder { protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException { HTableDescriptor tableDescriptor = null; try { - if (this.services != null) { + if (this.services != null && this.services.getTableDescriptors() != null) { tableDescriptor = this.services.getTableDescriptors().get(tableName); } } catch (FileNotFoundException fnfe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index e58e4863cc8..1d98cdd01a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; import java.util.Deque; import java.util.HashMap; @@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; /** *

This is a best effort load balancer. Given a Cost function F(C) => x It will @@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private public class StochasticLoadBalancer extends BaseLoadBalancer { - private static final String STEPS_PER_REGION_KEY = + protected static final String STEPS_PER_REGION_KEY = "hbase.master.balancer.stochastic.stepsPerRegion"; - private static final String MAX_STEPS_KEY = + protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; - private static final String MAX_RUNNING_TIME_KEY = + protected static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime"; - private static final String KEEP_REGION_LOADS = + protected static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); - private final RegionLocationFinder regionFinder = new RegionLocationFinder(); Map> loads = new HashMap>(); // values are defaults @@ -110,20 +115,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. private int numRegionLoadsToRemember = 15; - private RegionPicker[] pickers; + private CandidateGenerator[] candidateGenerators; private CostFromRegionLoadFunction[] regionLoadFunctions; private CostFunction[] costFunctions; // Keep locality based picker and cost function to alert them // when new services are offered - private LocalityBasedPicker localityPicker; + private LocalityBasedCandidateGenerator localityCandidateGenerator; private LocalityCostFunction localityCost; @Override public void setConf(Configuration conf) { super.setConf(conf); - regionFinder.setConf(conf); - maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); @@ -131,13 +134,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); - localityPicker = new LocalityBasedPicker(services); + localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); localityCost = new LocalityCostFunction(conf, services); - pickers = new RegionPicker[] { - new RandomRegionPicker(), - new LoadPicker(), - localityPicker + candidateGenerators = new CandidateGenerator[] { + new RandomCandidateGenerator(), + new LoadCandidateGenerator(), + localityCandidateGenerator, + new RegionReplicaCandidateGenerator(), }; regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -152,6 +156,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { new MoveCostFunction(conf), localityCost, new TableSkewCostFunction(conf), + new RegionReplicaHostCostFunction(conf), + new RegionReplicaRackCostFunction(conf), regionLoadFunctions[0], regionLoadFunctions[1], regionLoadFunctions[2], @@ -167,7 +173,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override public void setClusterStatus(ClusterStatus st) { super.setClusterStatus(st); - regionFinder.setClusterStatus(st); updateRegionLoad(); for(CostFromRegionLoadFunction cost : regionLoadFunctions) { cost.setClusterStatus(st); @@ -177,9 +182,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override public void setMasterServices(MasterServices masterServices) { super.setMasterServices(masterServices); - this.regionFinder.setServices(masterServices); this.localityCost.setServices(masterServices); - this.localityPicker.setServices(masterServices); + this.localityCandidateGenerator.setServices(masterServices); } @@ -202,8 +206,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Keep track of servers to iterate through them. - Cluster cluster = new Cluster(masterServerName, clusterState, - loads, regionFinder, getBackupMasters(), tablesOnMaster); + Cluster cluster = new Cluster(masterServerName, + clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager); + initCosts(cluster); + double currentCost = computeCost(cluster, Double.MAX_VALUE); double initCost = currentCost; @@ -213,42 +219,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); // Perform a stochastic walk to see if we can get a good fit. long step; + for (step = 0; step < computedMaxSteps; step++) { - int pickerIdx = RANDOM.nextInt(pickers.length); - RegionPicker p = pickers[pickerIdx]; - Pair, Pair> picks = p.pick(cluster); + int generatorIdx = RANDOM.nextInt(candidateGenerators.length); + CandidateGenerator p = candidateGenerators[generatorIdx]; + Cluster.Action action = p.generate(cluster); - int leftServer = picks.getFirst().getFirst(); - int leftRegion = picks.getFirst().getSecond(); - int rightServer = picks.getSecond().getFirst(); - int rightRegion = picks.getSecond().getSecond(); - - // We couldn't find a server - if (rightServer < 0 || leftServer < 0) { + if (action.type == Type.NULL) { continue; } - // We randomly picked to do nothing. - if (leftRegion < 0 && rightRegion < 0) { - continue; - } - - cluster.moveOrSwapRegion(leftServer, - rightServer, - leftRegion, - rightRegion); + cluster.doAction(action); + updateCostsWithAction(cluster, action); newCost = computeCost(cluster, currentCost); + // Should this be kept? if (newCost < currentCost) { currentCost = newCost; } else { // Put things back the way they were before. - // TODO: undo by remembering old values, using an UndoAction class - cluster.moveOrSwapRegion(leftServer, - rightServer, - rightRegion, - leftRegion); + // TODO: undo by remembering old values + Action undoAction = action.undoAction(); + cluster.doAction(undoAction); + updateCostsWithAction(cluster, undoAction); } if (EnvironmentEdgeManager.currentTimeMillis() - startTime > @@ -343,6 +337,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + protected void initCosts(Cluster cluster) { + for (CostFunction c:costFunctions) { + c.init(cluster); + } + } + + protected void updateCostsWithAction(Cluster cluster, Action action) { + for (CostFunction c : costFunctions) { + c.postAction(action); + } + } /** * This is the main cost function. It will compute a cost associated with a proposed cluster @@ -361,7 +366,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { continue; } - total += c.getMultiplier() * c.cost(cluster); + total += c.getMultiplier() * c.cost(); if (total > previousCost) { return total; @@ -370,8 +375,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return total; } - abstract static class RegionPicker { - abstract Pair, Pair> pick(Cluster cluster); + /** Generates a candidate action to be applied to the cluster for cost function search */ + abstract static class CandidateGenerator { + abstract Cluster.Action generate(Cluster cluster); /** * From a list of regions pick a random one. Null can be returned which @@ -402,6 +408,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return RANDOM.nextInt(cluster.numServers); } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; @@ -414,11 +421,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - protected Pair pickRandomRegions(Cluster cluster, + protected Cluster.Action pickRandomRegions(Cluster cluster, int thisServer, int otherServer) { if (thisServer < 0 || otherServer < 0) { - return new Pair(-1, -1); + return Cluster.NullAction; } // Decide who is most likely to need another region @@ -432,45 +439,50 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); - return new Pair(thisRegion, otherRegion); + return getAction(thisServer, thisRegion, otherServer, otherRegion); + } + + protected Cluster.Action getAction (int fromServer, int fromRegion, + int toServer, int toRegion) { + if (fromServer < 0 || toServer < 0) { + return Cluster.NullAction; + } + if (fromRegion > 0 && toRegion > 0) { + return new Cluster.SwapRegionsAction(fromServer, fromRegion, + toServer, toRegion); + } else if (fromRegion > 0) { + return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer); + } else if (toRegion > 0) { + return new Cluster.MoveRegionAction(toRegion, toServer, fromServer); + } else { + return Cluster.NullAction; + } } } - static class RandomRegionPicker extends RegionPicker { + static class RandomCandidateGenerator extends CandidateGenerator { @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { int thisServer = pickRandomServer(cluster); // Pick the other server int otherServer = pickOtherRandomServer(cluster, thisServer); - Pair regions = pickRandomRegions(cluster, thisServer, otherServer); - - return new Pair, Pair>( - new Pair(thisServer, regions.getFirst()), - new Pair(otherServer, regions.getSecond()) - - ); + return pickRandomRegions(cluster, thisServer, otherServer); } - } - public static class LoadPicker extends RegionPicker { + public static class LoadCandidateGenerator extends CandidateGenerator { @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { cluster.sortServersByRegionCount(); int thisServer = pickMostLoadedServer(cluster, -1); int otherServer = pickLeastLoadedServer(cluster, thisServer); - Pair regions = pickRandomRegions(cluster, thisServer, otherServer); - return new Pair, Pair>( - new Pair(thisServer, regions.getFirst()), - new Pair(otherServer, regions.getSecond()) - - ); + return pickRandomRegions(cluster, thisServer, otherServer); } private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { @@ -500,21 +512,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - static class LocalityBasedPicker extends RegionPicker { + static class LocalityBasedCandidateGenerator extends CandidateGenerator { private MasterServices masterServices; - LocalityBasedPicker(MasterServices masterServices) { + LocalityBasedCandidateGenerator(MasterServices masterServices) { this.masterServices = masterServices; } @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { if (this.masterServices == null) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + return Cluster.NullAction; } // Pick a random region server int thisServer = pickRandomServer(cluster); @@ -523,10 +532,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); if (thisRegion == -1) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + return Cluster.NullAction; } // Pick the server with the highest locality @@ -535,10 +541,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // pick an region on the other server to potentially swap int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f); - return new Pair, Pair>( - new Pair(thisServer,thisRegion), - new Pair(otherServer,otherRegion) - ); + return getAction(thisServer, thisRegion, otherServer, otherRegion); } private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { @@ -563,6 +566,79 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + /** + * Generates candidates which moves the replicas out of the region server for + * co-hosted region replicas + */ + public static class RegionReplicaCandidateGenerator extends CandidateGenerator { + + RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); + + @Override + Cluster.Action generate(Cluster cluster) { + + int serverIndex = pickRandomServer(cluster); + + if (cluster.numServers <= 1 || serverIndex == -1) { + return Cluster.NullAction; + } + + // randomly select one primaryIndex out of all region replicas in the same server + // we don't know how many region replicas are co-hosted, we will randomly select one + // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) + int currentPrimary = -1; + int currentPrimaryIndex = -1; + int primaryIndex = -1; + double currentLargestRandom = -1; + // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region + // ids for the regions hosted in server, a consecutive repetition means that replicas + // are co-hosted + for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) { + int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length + ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1; + if (primary != currentPrimary) { // check for whether we see a new primary + int numReplicas = j - currentPrimaryIndex; + if (numReplicas > 1) { // means consecutive primaries, indicating co-location + // decide to select this primary region id or not + double currentRandom = RANDOM.nextDouble(); + if (currentRandom > currentLargestRandom) { + primaryIndex = currentPrimary; // select this primary + currentLargestRandom = currentRandom; + } + } + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + + // if there are no pairs of region replicas co-hosted, default to random generator + if (primaryIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + // we have found the primary id for the region to move. Now find the actual regionIndex + // with the given primary, prefer to move the secondary region. + int regionIndex = -1; + for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) { + int region = cluster.regionsPerServer[serverIndex][k]; + if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) { + // always move the secondary, not the primary + if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) { + regionIndex = region; + break; + } + } + } + + int toServerIndex = pickOtherRandomServer(cluster, serverIndex); + + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + + return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + } + /** * Base class of StochasticLoadBalancer's Cost Functions. */ @@ -570,6 +646,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private float multiplier = 0; + protected Cluster cluster; + CostFunction(Configuration c) { } @@ -582,7 +660,42 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { this.multiplier = m; } - abstract double cost(Cluster cluster); + /** Called once per LB invocation to give the cost function + * to initialize it's state, and perform any costly calculation. + */ + void init(Cluster cluster) { + this.cluster = cluster; + } + + /** Called once per cluster Action to give the cost function + * an opportunity to update it's state. postAction() is always + * called at least once before cost() is called with the cluster + * that this action is performed on. */ + void postAction(Action action) { + switch (action.type) { + case NULL: break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionMoved(ar.region, -1, ar.server); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionMoved(mra.region, mra.fromServer, mra.toServer); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionMoved(a.fromRegion, a.fromServer, a.toServer); + regionMoved(a.toRegion, a.toServer, a.fromServer); + break; + default: + throw new RuntimeException("Uknown action:" + action.type); + } + } + + protected void regionMoved(int region, int oldServer, int newServer) { + } + + abstract double cost(); /** * Function to compute a scaled cost using {@link DescriptiveStatistics}. It @@ -611,8 +724,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return scaled; } - - private double getSum(double[] stats) { double total = 0; for(double s:stats) { @@ -663,7 +774,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } @Override - double cost(Cluster cluster) { + double cost() { // Try and size the max number of Moves, but always be prepared to move some. int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), DEFAULT_MAX_MOVES); @@ -705,7 +816,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } @Override - double cost(Cluster cluster) { + double cost() { if (stats == null || stats.length != cluster.numServers) { stats = new double[cluster.numServers]; } @@ -740,7 +851,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } @Override - double cost(Cluster cluster) { + double cost() { double max = cluster.numRegions; double min = ((double) cluster.numRegions) / cluster.numServers; double value = 0; @@ -775,7 +886,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } @Override - double cost(Cluster cluster) { + double cost() { double max = 0; double cost = 0; @@ -834,9 +945,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { this.loads = l; } - @Override - double cost(Cluster cluster) { + double cost() { if (clusterStatus == null || loads == null) { return 0; } @@ -930,6 +1040,165 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + /** + * A cost function for region replicas. We give a very high cost to hosting + * replicas of the same region in the same host. We do not prevent the case + * though, since if numReplicas > numRegionServers, we still want to keep the + * replica open. + */ + public static class RegionReplicaHostCostFunction extends CostFunction { + private static final String REGION_REPLICA_HOST_COST_KEY = + "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; + private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; + + long maxCost = 0; + long[] costsPerGroup; // group is either server, host or rack + int[][] primariesOfRegionsPerGroup; + + public RegionReplicaHostCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, + DEFAULT_REGION_REPLICA_HOST_COST_KEY)); + } + + @Override + void init(Cluster cluster) { + super.init(cluster); + // max cost is the case where every region replica is hosted together regardless of host + maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; + costsPerGroup = new long[cluster.numHosts]; + primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based + ? cluster.primariesOfRegionsPerHost + : cluster.primariesOfRegionsPerServer; + for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) { + costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]); + } + } + + long getMaxCost(Cluster cluster) { + if (!cluster.hasRegionReplicas) { + return 0; // short circuit + } + // max cost is the case where every region replica is hosted together regardless of host + int[] primariesOfRegions = new int[cluster.numRegions]; + for (int i = 0; i < cluster.regions.length; i++) { + // assume all regions are hosted by only one server + int primaryIndex = cluster.regionIndexToPrimaryIndex[i]; + primariesOfRegions[i] = primaryIndex; + } + + Arrays.sort(primariesOfRegions); + + // compute numReplicas from the sorted array + return costPerGroup(primariesOfRegions); + } + + @Override + double cost() { + if (maxCost <= 0) { + return 0; + } + + long totalCost = 0; + for (int i = 0 ; i < costsPerGroup.length; i++) { + totalCost += costsPerGroup[i]; + } + return scale(0, maxCost, totalCost); + } + + /** + * For each primary region, it computes the total number of replicas in the array (numReplicas) + * and returns a sum of numReplicas-1 squared. For example, if the server hosts + * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it + * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1). + * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted + * @return a sum of numReplicas-1 squared for each primary region in the group. + */ + protected long costPerGroup(int[] primariesOfRegions) { + long cost = 0; + int currentPrimary = -1; + int currentPrimaryIndex = -1; + // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions + // sharing the same primary will have consecutive numbers in the array. + for (int j = 0 ; j <= primariesOfRegions.length; j++) { + int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1; + if (primary != currentPrimary) { // we see a new primary + int numReplicas = j - currentPrimaryIndex; + // square the cost + if (numReplicas > 1) { // means consecutive primaries, indicating co-location + cost += (numReplicas - 1) * (numReplicas - 1); + } + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + + return cost; + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + if (maxCost <= 0) { + return; // no need to compute + } + if (cluster.multiServersPerHost) { + int oldHost = cluster.serverIndexToHostIndex[oldServer]; + int newHost = cluster.serverIndexToHostIndex[newServer]; + if (newHost != oldHost) { + costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]); + costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]); + } + } else { + costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]); + costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]); + } + } + } + + /** + * A cost function for region replicas for the rack distribution. We give a relatively high + * cost to hosting replicas of the same region in the same rack. We do not prevent the case + * though. + */ + public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { + private static final String REGION_REPLICA_RACK_COST_KEY = + "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; + private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; + + public RegionReplicaRackCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY)); + } + + @Override + void init(Cluster cluster) { + this.cluster = cluster; + if (cluster.numRacks <= 1) { + maxCost = 0; + return; // disabled for 1 rack + } + // max cost is the case where every region replica is hosted together regardless of rack + maxCost = getMaxCost(cluster); + costsPerGroup = new long[cluster.numRacks]; + for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) { + costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]); + } + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + if (maxCost <= 0) { + return; // no need to compute + } + int oldRack = cluster.serverIndexToRackIndex[oldServer]; + int newRack = cluster.serverIndexToRackIndex[newServer]; + if (newRack != oldRack) { + costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]); + costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]); + } + } + } + /** * Compute the cost of total memstore size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java index a381cb9cb2e..05de95895d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -123,60 +125,60 @@ public class TestMasterOperationsForRegionReplicas { assert (state != null); } } - // TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled) -// List metaRows = MetaReader.fullScan(ct); -// int numRows = 0; -// for (Result result : metaRows) { -// RegionLocations locations = MetaReader.getRegionLocations(result); -// HRegionInfo hri = locations.getRegionLocation().getRegionInfo(); -// if (!hri.getTable().equals(table)) continue; -// numRows += 1; -// HRegionLocation[] servers = locations.getRegionLocations(); -// // have two locations for the replicas of a region, and the locations should be different -// assert(servers.length == 2); -// assert(!servers[0].equals(servers[1])); -// } -// assert(numRows == numRegions); -// -// // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta -// // class -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now kill the master, restart it and see if the assignments are kept -// ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); -// TEST_UTIL.getHBaseClusterInterface().stopMaster(master); -// TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); -// TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); -// TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); -// for (int i = 0; i < numRegions; i++) { -// for (int j = 0; j < numReplica; j++) { -// HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j); -// RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() -// .getRegionStates().getRegionState(replica); -// assert (state != null); -// } -// } -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now shut the whole cluster down, and verify the assignments are kept so that the -// // availability constraints are met. -// TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); -// TEST_UTIL.shutdownMiniHBaseCluster(); -// TEST_UTIL.startMiniHBaseCluster(1, numSlaves); -// TEST_UTIL.waitTableEnabled(table.getName()); -// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now shut the whole cluster down, and verify regions are assigned even if there is only -// // one server running -// TEST_UTIL.shutdownMiniHBaseCluster(); -// TEST_UTIL.startMiniHBaseCluster(1, 1); -// TEST_UTIL.waitTableEnabled(table.getName()); -// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); -// validateSingleRegionServerAssignment(ct, numRegions, numReplica); -// for (int i = 1; i < numSlaves; i++) { //restore the cluster -// TEST_UTIL.getMiniHBaseCluster().startRegionServer(); -// } + + List metaRows = MetaReader.fullScan(ct); + int numRows = 0; + for (Result result : metaRows) { + RegionLocations locations = MetaReader.getRegionLocations(result); + HRegionInfo hri = locations.getRegionLocation().getRegionInfo(); + if (!hri.getTable().equals(table)) continue; + numRows += 1; + HRegionLocation[] servers = locations.getRegionLocations(); + // have two locations for the replicas of a region, and the locations should be different + assert(servers.length == 2); + assert(!servers[0].equals(servers[1])); + } + assert(numRows == numRegions); + + // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta + // class + validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct); + + // Now kill the master, restart it and see if the assignments are kept + ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); + TEST_UTIL.getHBaseClusterInterface().stopMaster(master); + TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); + TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); + TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); + for (int i = 0; i < numRegions; i++) { + for (int j = 0; j < numReplica; j++) { + HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j); + RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getRegionState(replica); + assert (state != null); + } + } + validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct); + + // Now shut the whole cluster down, and verify the assignments are kept so that the + // availability constraints are met. + TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.startMiniHBaseCluster(1, numSlaves); + TEST_UTIL.waitTableEnabled(table.getName()); + ct = new CatalogTracker(TEST_UTIL.getConfiguration()); + validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct); + + // Now shut the whole cluster down, and verify regions are assigned even if there is only + // one server running + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.startMiniHBaseCluster(1, 1); + TEST_UTIL.waitTableEnabled(table.getName()); + ct = new CatalogTracker(TEST_UTIL.getConfiguration()); + validateSingleRegionServerAssignment(ct, numRegions, numReplica); + for (int i = 1; i < numSlaves; i++) { //restore the cluster + TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + } //check on alter table admin.disableTable(table); @@ -288,7 +290,7 @@ public class TestMasterOperationsForRegionReplicas { assert(count.get() == numRegions); } - private void validateFromSnapshotFromMeta(TableName table, int numRegions, + private void validateFromSnapshotFromMeta(HBaseTestingUtility util, TableName table, int numRegions, int numReplica, CatalogTracker ct) throws IOException { SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct); snapshot.initialize(); @@ -296,6 +298,9 @@ public class TestMasterOperationsForRegionReplicas { assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace Map> serverToRegionMap = snapshot.getRegionServerToRegionMap(); for (Map.Entry> entry : serverToRegionMap.entrySet()) { + if (entry.getKey().equals(util.getHBaseCluster().getMaster().getServerName())) { + continue; + } List regions = entry.getValue(); Set setOfStartKeys = new HashSet(); for (HRegionInfo region : regions) { @@ -307,7 +312,7 @@ public class TestMasterOperationsForRegionReplicas { } // the number of startkeys will be equal to the number of regions hosted in each server // (each server will be hosting one replica of a region) - assertEquals(setOfStartKeys.size() , numRegions); + assertEquals(numRegions, setOfStartKeys.size()); } } @@ -316,9 +321,14 @@ public class TestMasterOperationsForRegionReplicas { SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct); snapshot.initialize(); Map regionToServerMap = snapshot.getRegionToRegionServerMap(); - assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace + assertEquals(regionToServerMap.size(), numRegions * numReplica + 1); //'1' for the namespace Map> serverToRegionMap = snapshot.getRegionServerToRegionMap(); - assert(serverToRegionMap.keySet().size() == 1); - assert(serverToRegionMap.values().iterator().next().size() == numRegions * numReplica + 1); + assertEquals(serverToRegionMap.keySet().size(), 2); // 1 rs + 1 master + for (Map.Entry> entry : serverToRegionMap.entrySet()) { + if (entry.getKey().equals(TEST_UTIL.getHBaseCluster().getMaster().getServerName())) { + continue; + } + assertEquals(entry.getValue().size(), numRegions * numReplica); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index a7d678dbe71..7216abd95d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -21,20 +21,26 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; /** * Class used to be the base of unit tests on load balancers. It gives helper @@ -80,6 +86,50 @@ public class BalancerTestBase { } } + /** + * Checks whether region replicas are not hosted on the same host. + */ + public void assertRegionReplicaPlacement(Map> serverMap, RackManager rackManager) { + TreeMap> regionsPerHost = new TreeMap>(); + TreeMap> regionsPerRack = new TreeMap>(); + + for (Entry> entry : serverMap.entrySet()) { + String hostname = entry.getKey().getHostname(); + Set infos = regionsPerHost.get(hostname); + if (infos == null) { + infos = new HashSet(); + regionsPerHost.put(hostname, infos); + } + + for (HRegionInfo info : entry.getValue()) { + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + if (!infos.add(primaryInfo)) { + Assert.fail("Two or more region replicas are hosted on the same host after balance"); + } + } + } + + if (rackManager == null) { + return; + } + + for (Entry> entry : serverMap.entrySet()) { + String rack = rackManager.getRack(entry.getKey()); + Set infos = regionsPerRack.get(rack); + if (infos == null) { + infos = new HashSet(); + regionsPerRack.put(rack, infos); + } + + for (HRegionInfo info : entry.getValue()) { + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + if (!infos.add(primaryInfo)) { + Assert.fail("Two or more region replicas are hosted on the same rack after balance"); + } + } + } + } + protected String printStats(List servers) { int numServers = servers.size(); int totalRegions = 0; @@ -159,18 +209,18 @@ public class BalancerTestBase { map.put(sn, sal); } - protected Map> mockClusterServers(int[] mockCluster) { + protected TreeMap> mockClusterServers(int[] mockCluster) { return mockClusterServers(mockCluster, -1); } protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { return new BaseLoadBalancer.Cluster(null, - mockClusterServers(mockCluster, -1), null, null, null, null); + mockClusterServers(mockCluster, -1), null, null, null, null, null); } - protected Map> mockClusterServers(int[] mockCluster, int numTables) { + protected TreeMap> mockClusterServers(int[] mockCluster, int numTables) { int numServers = mockCluster.length; - Map> servers = new TreeMap>(); + TreeMap> servers = new TreeMap>(); for (int i = 0; i < numServers; i++) { int numRegions = mockCluster[i]; ServerAndLoad sal = randomServer(0); @@ -218,7 +268,7 @@ public class BalancerTestBase { ServerName sn = this.serverQueue.poll(); return new ServerAndLoad(sn, numRegionsPerServer); } - String host = "srv" + rand.nextInt(100000); + String host = "srv" + rand.nextInt(Integer.MAX_VALUE); int port = rand.nextInt(60000); long startCode = rand.nextLong(); ServerName sn = ServerName.valueOf(host, port, startCode); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index 7bd0b71d697..0507e26d167 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +47,10 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,8 +62,11 @@ import com.google.common.collect.Lists; public class TestBaseLoadBalancer extends BalancerTestBase { private static LoadBalancer loadBalancer; - private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + private static final Log LOG = LogFactory.getLog(TestBaseLoadBalancer.class); private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L); + private static RackManager rackManager; + private static final int NUM_SERVERS = 15; + private static ServerName[] servers = new ServerName[NUM_SERVERS]; int[][] regionsAndServersMocks = new int[][] { // { num regions, num servers } @@ -75,6 +83,21 @@ public class TestBaseLoadBalancer extends BalancerTestBase { MasterServices st = Mockito.mock(MasterServices.class); Mockito.when(st.getServerName()).thenReturn(master); loadBalancer.setMasterServices(st); + + // Set up the rack topologies (5 machines per rack) + rackManager = Mockito.mock(RackManager.class); + for (int i = 0; i < NUM_SERVERS; i++) { + servers[i] = ServerName.valueOf("foo"+i+":1234",-1); + if (i < 5) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1"); + } + if (i >= 5 && i < 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2"); + } + if (i >= 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3"); + } + } } public static class MockBalancer extends BaseLoadBalancer { @@ -214,6 +237,138 @@ public class TestBaseLoadBalancer extends BalancerTestBase { assertRetainedAssignment(existing, listOfServerNames, assignment); } + @Test + public void testRegionAvailability() throws Exception { + // Create a cluster with a few servers, assign them to specific racks + // then assign some regions. The tests should check whether moving a + // replica from one node to a specific other node or rack lowers the + // availability of the region or not + + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). A similar thing applies to the region lists + Cluster cluster = new Cluster(master, clusterState, null, null, null, null, rackManager); + // check whether a move of region1 from servers[0] to servers[1] would lower + // the availability of region1 + assertTrue(cluster.wouldLowerAvailability(hri1, servers[1])); + // check whether a move of region1 from servers[0] to servers[2] would lower + // the availability of region1 + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); + // check whether a move of replica_of_region1 from servers[0] to servers[2] would lower + // the availability of replica_of_region1 + assertTrue(!cluster.wouldLowerAvailability(hri2, servers[2])); + // check whether a move of region2 from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(!cluster.wouldLowerAvailability(hri3, servers[1])); + + // now lets have servers[1] host replica_of_region2 + list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); + // create a new clusterState with the above change + cluster = new Cluster(master, clusterState, null, null, null, null, rackManager); + // now check whether a move of a replica from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(cluster.wouldLowerAvailability(hri3, servers[1])); + + // start over again + clusterState.clear(); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 and replica_of_region2 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + clusterState.put(servers[10], new ArrayList()); //servers[10], rack3 hosts no region + // create a cluster with the above clusterState + cluster = new Cluster(master, clusterState, null, null, null, null, rackManager); + // check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would + // lower the availability + + assertTrue(cluster.wouldLowerAvailability(hri1, servers[0])); + + // now create a cluster without the rack manager + cluster = new Cluster(master, clusterState, null, null, null, null, null); + // now repeat check whether a move of region1 from servers[0] to servers[6] would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6])); + } + + @Test + public void testRegionAvailabilityWithRegionMoves() throws Exception { + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). + Cluster cluster = new Cluster(master, clusterState, null, null, null, null, rackManager); + // check whether moving region1 from servers[1] to servers[2] would lower availability + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); + + // now move region1 from servers[0] to servers[2] + cluster.doAction(new MoveRegionAction(0, 0, 2)); + // now repeat check whether moving region1 from servers[1] to servers[2] + // would lower availability + assertTrue(cluster.wouldLowerAvailability(hri1, servers[2])); + + // start over again + clusterState.clear(); + List list3 = new ArrayList(); + HRegionInfo hri4 = RegionReplicaUtil.getRegionInfoForReplica(hri3, 1); + list3.add(hri4); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2 + // create a cluster with the above clusterState + cluster = new Cluster(master, clusterState, null, null, null, null, rackManager); + // check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0])); + // now move region2 from servers[6],rack2 to servers[0],rack1 + cluster.doAction(new MoveRegionAction(2, 2, 0)); + // now repeat check if replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(cluster.wouldLowerAvailability(hri3, servers[0])); + } + private List getListOfServerNames(final List sals) { List list = new ArrayList(); for (ServerAndLoad e : sals) { @@ -289,7 +444,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase { assignRegions(regions, oldServers, clusterState); // should not throw exception: - BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, null, null, null); + BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, null, null, null, null); assertEquals(101 + 9, cluster.numRegions); assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port } @@ -331,7 +486,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase { when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus - BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, locationFinder, null, null); + BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, locationFinder, null, null, null); int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index e6c731994c4..6aee367891e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -17,10 +17,19 @@ */ package org.apache.hadoop.hbase.master.balancer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; @@ -34,29 +43,30 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - @Category(MediumTests.class) public class TestStochasticLoadBalancer extends BalancerTestBase { public static final String REGION_KEY = "testRegion"; private static StochasticLoadBalancer loadBalancer; private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + private static Configuration conf; + private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L); @BeforeClass public static void beforeAllTests() throws Exception { - Configuration conf = HBaseConfiguration.create(); + conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); + conf.setClass("hbase.util.ip.to.rack.determiner", + MyRackResolver.class, DNSToSwitchMapping.class); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -187,22 +197,29 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { StochasticLoadBalancer.CostFunction costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf, 1, 1); for (int[] mockCluster : clusterStateMocks) { - double cost = costFunction.cost(mockCluster(mockCluster)); + costFunction.init(mockCluster(mockCluster)); + double cost = costFunction.cost(); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } + costFunction.init(mockCluster(new int[]{0, 0, 0, 0, 1})); assertEquals(1, - costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01); + costFunction.cost(), 0.01); + costFunction.init(mockCluster(new int[]{0, 0, 0, 1, 1})); assertEquals(.75, - costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01); + costFunction.cost(), 0.01); + costFunction.init(mockCluster(new int[]{0, 0, 1, 1, 1})); assertEquals(.5, - costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01); + costFunction.cost(), 0.01); + costFunction.init(mockCluster(new int[]{0, 1, 1, 1, 1})); assertEquals(.25, - costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01); + costFunction.cost(), 0.01); + costFunction.init(mockCluster(new int[]{1, 1, 1, 1, 1})); assertEquals(0, - costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01); + costFunction.cost(), 0.01); + costFunction.init(mockCluster(new int[]{10, 10, 10, 10, 10})); assertEquals(0, - costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01); + costFunction.cost(), 0.01); } @Test @@ -212,7 +229,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); - double cost = costFunction.cost(cluster); + costFunction.init(cluster); + double cost = costFunction.cost(); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } @@ -250,10 +268,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 3; int numRegions = 20; int numRegionsPerServer = 3; //all servers except one + int replication = 1; int numTables = 2; Map> serverMap = - createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); List list = convertToList(serverMap); @@ -275,13 +294,103 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { assertNull(plans); } + @Test + public void testReplicaCost() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); + for (int[] mockCluster : clusterStateMocks) { + BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + costFunction.init(cluster); + double cost = costFunction.cost(); + assertTrue(cost >= 0); + assertTrue(cost <= 1.01); + } + } + + @Test + public void testReplicaCostForReplicas() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); + + int [] servers = new int[] {3,3,3,3,3}; + TreeMap> clusterState = mockClusterServers(servers); + + BaseLoadBalancer.Cluster cluster; + + cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null); + costFunction.init(cluster); + double costWithoutReplicas = costFunction.cost(); + assertEquals(0, costWithoutReplicas, 0); + + // replicate the region from first server to the last server + HRegionInfo replica1 = RegionReplicaUtil.getRegionInfoForReplica( + clusterState.firstEntry().getValue().get(0),1); + clusterState.lastEntry().getValue().add(replica1); + + cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null); + costFunction.init(cluster); + double costWith1ReplicaDifferentServer = costFunction.cost(); + + assertEquals(0, costWith1ReplicaDifferentServer, 0); + + // add a third replica to the last server + HRegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2); + clusterState.lastEntry().getValue().add(replica2); + + cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null); + costFunction.init(cluster); + double costWith1ReplicaSameServer = costFunction.cost(); + + assertTrue(costWith1ReplicaDifferentServer < costWith1ReplicaSameServer); + + // test with replication = 4 for following: + + HRegionInfo replica3; + Iterator>> it; + Entry> entry; + + clusterState = mockClusterServers(servers); + it = clusterState.entrySet().iterator(); + entry = it.next(); //first server + HRegionInfo hri = entry.getValue().get(0); + replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1); + replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2); + replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3); + entry.getValue().add(replica1); + entry.getValue().add(replica2); + it.next().getValue().add(replica3); //2nd server + + cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null); + costFunction.init(cluster); + double costWith3ReplicasSameServer = costFunction.cost(); + + clusterState = mockClusterServers(servers); + hri = clusterState.firstEntry().getValue().get(0); + replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1); + replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2); + replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3); + + clusterState.firstEntry().getValue().add(replica1); + clusterState.lastEntry().getValue().add(replica2); + clusterState.lastEntry().getValue().add(replica3); + + cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null); + costFunction.init(cluster); + double costWith2ReplicasOnTwoServers = costFunction.cost(); + + assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer); + } + @Test (timeout = 60000) public void testSmallCluster() { int numNodes = 10; int numRegions = 1000; int numRegionsPerServer = 40; //all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 60000) @@ -289,8 +398,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 20; int numRegions = 2000; int numRegionsPerServer = 40; //all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 60000) @@ -298,8 +408,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 20; int numRegions = 2000; int numRegionsPerServer = 1; // all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */); + /* fails because of max moves */ + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, false); } @Test (timeout = 800000) @@ -307,8 +419,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 100; int numRegions = 10000; int numRegionsPerServer = 60; // all servers except one + int replication = 1; int numTables = 40; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 800000) @@ -316,12 +429,15 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 200; int numRegions = 100000; int numRegionsPerServer = 40; // all servers except one + int replication = 1; int numTables = 400; testWithCluster(numNodes, numRegions, numRegionsPerServer, + replication, numTables, - false /* num large num regions means may not always get to best balance with one run */); + false, /* num large num regions means may not always get to best balance with one run */ + false); } @@ -330,8 +446,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 100; int numRegions = 2000; int numRegionsPerServer = 9; // all servers except one + int replication = 1; int numTables = 110; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); // TODO(eclark): Make sure that the tables are well distributed. } @@ -341,20 +458,145 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numRegions = 100000; //100 regions per RS int numRegionsPerServer = 80; //all servers except one int numTables = 100; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + int replication = 1; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnSmallCluster() { + int numNodes = 10; + int numRegions = 1000; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 80; //all regions are mostly balanced + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnMidCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 200; + int numRegions = 40 * 200; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 30; //all regions are mostly balanced + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnLargeCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 1000; + int numRegions = 40 * numNodes; //40 regions per RS + int numRegionsPerServer = 30; //all servers except one + int numTables = 100; + int replication = 3; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnMidClusterHighReplication() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 100; + int numRegions = 6 * 100; + int replication = 100; // 100 replicas per region, one for each server + int numRegionsPerServer = 5; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicationOnMidClusterSameHosts() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numHosts = 100; + int numRegions = 100 * 100; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 5; + int numTables = 10; + Map> serverMap = + createServerMap(numHosts, numRegions, numRegionsPerServer, replication, numTables); + int numNodesPerHost = 4; + + // create a new map with 4 RS per host. + Map> newServerMap = new TreeMap>(serverMap); + for (Map.Entry> entry : serverMap.entrySet()) { + for (int i=1; i < numNodesPerHost; i++) { + ServerName s1 = entry.getKey(); + ServerName s2 = ServerName.valueOf(s1.getHostname(), s1.getPort() + i, 1); // create an RS for the same host + newServerMap.put(s2, new ArrayList()); + } + } + + testWithCluster(newServerMap, null, true, true); + } + + private static class ForTestRackManager extends RackManager { + int numRacks; + public ForTestRackManager(int numRacks) { + this.numRacks = numRacks; + } + @Override + public String getRack(ServerName server) { + return "rack_" + (server.hashCode() % numRacks); + } + } + + @Test (timeout = 120000) + public void testRegionReplicationOnMidClusterWithRacks() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 60 * 1000); // 60 sec + loadBalancer.setConf(conf); + int numNodes = 50; + int numRegions = numNodes * 30; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 25; + int numTables = 10; + int numRacks = 4; // all replicas should be on a different rack + Map> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + RackManager rm = new ForTestRackManager(numRacks); + + testWithCluster(serverMap, rm, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 80; + int numRegions = 6 * 100; + int replication = 100; // 100 replicas per region, more than numNodes + int numRegionsPerServer = 5; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false); } protected void testWithCluster(int numNodes, - int numRegions, - int numRegionsPerServer, - int numTables, - boolean assertFullyBalanced) { + int numRegions, + int numRegionsPerServer, + int replication, + int numTables, + boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { Map> serverMap = - createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas); + } + + protected void testWithCluster(Map> serverMap, + RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { List list = convertToList(serverMap); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + loadBalancer.setRackManager(rackManager); // Run the balancer. List plans = loadBalancer.balanceCluster(serverMap); assertNotNull(plans); @@ -369,12 +611,16 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { assertClusterAsBalanced(balancedCluster); List secondPlans = loadBalancer.balanceCluster(serverMap); assertNull(secondPlans); + if (assertFullyBalancedForReplicas) { + assertRegionReplicaPlacement(serverMap, rackManager); + } } } private Map> createServerMap(int numNodes, int numRegions, int numRegionsPerServer, + int replication, int numTables) { //construct a cluster of numNodes, having a total of numRegions. Each RS will hold //numRegionsPerServer many regions except for the last one, which will host all the @@ -384,6 +630,40 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { cluster[i] = numRegionsPerServer; } cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); - return mockClusterServers(cluster, numTables); + Map> clusterState = mockClusterServers(cluster, numTables); + if (replication > 0) { + // replicate the regions to the same servers + for (List regions : clusterState.values()) { + int length = regions.size(); + for (int i = 0; i < length; i++) { + for (int r = 1; r < replication ; r++) { + regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r)); + } + } + } + } + + return clusterState; + } + + public static class MyRackResolver implements DNSToSwitchMapping { + + public MyRackResolver(Configuration conf) {} + + @Override + public List resolve(List names) { + List racks = new ArrayList(names.size()); + for (int i = 0; i < names.size(); i++) { + racks.add(i, NetworkTopology.DEFAULT_RACK); + } + return racks; + } + + @Override + public void reloadCachedMappings() {} + + @Override + public void reloadCachedMappings(List names) { + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index cc2235fe19e..86e6b899b48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -105,7 +105,7 @@ public class TestRegionReplicas { private void openRegion(HRegionInfo hri) throws Exception { ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); // first version is '0' - AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null); + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null); AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); Assert.assertTrue(responseOpen.getOpeningState(0).