HBASE-12122 Try not to assign user regions to master all the time

This commit is contained in:
Jimmy Xiang 2014-09-30 10:59:15 -07:00
parent 0a3c24f601
commit 16228c275d
13 changed files with 187 additions and 208 deletions

View File

@ -258,6 +258,7 @@ public final class ProtobufUtil {
* @return True if passed <code>bytes</code> has {@link #PB_MAGIC} for a prefix.
*/
public static boolean isPBMagicPrefix(final byte [] bytes) {
if (bytes == null) return false;
return isPBMagicPrefix(bytes, 0, bytes.length);
}

View File

@ -2947,12 +2947,15 @@ public class AssignmentManager extends ZooKeeperListener {
ServerName serverName = regionState.getServerName();
ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
while (serverManager.isServerOnline(serverName)
&& !server.isStopped() && !server.isAborted()) {
for (int i = 1; i <= maximumAttempts; i++) {
if (!serverManager.isServerOnline(serverName)
|| server.isStopped() || server.isAborted()) {
return; // No need any more
}
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
if (shouldAssignRegionsWithFavoredNodes) {
favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
@ -3005,12 +3008,15 @@ public class AssignmentManager extends ZooKeeperListener {
ServerName serverName = regionState.getServerName();
ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
while (serverManager.isServerOnline(serverName)
&& !server.isStopped() && !server.isAborted()) {
for (int i = 1; i <= maximumAttempts; i++) {
if (!serverManager.isServerOnline(serverName)
|| server.isStopped() || server.isAborted()) {
return; // No need any more
}
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
// This means the region is still on the target server
LOG.debug("Got false in retry sendRegionClose for "

View File

@ -1252,8 +1252,10 @@ public class MasterRpcServices extends RSRpcServices
RegionStateTransition rt = req.getTransition(0);
TableName tableName = ProtobufUtil.toTableName(
rt.getRegionInfo(0).getTableName());
if (!TableName.META_TABLE_NAME.equals(tableName)
&& !master.assignmentManager.isFailoverCleanupDone()) {
RegionStates regionStates = master.assignmentManager.getRegionStates();
if (!(TableName.META_TABLE_NAME.equals(tableName)
&& regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null)
&& !master.assignmentManager.isFailoverCleanupDone()) {
// Meta region is assigned before master finishes the
// failover cleanup. So no need this check for it
throw new PleaseHoldException("Master is rebuilding user regions");

View File

@ -444,7 +444,7 @@ public class ServerManager {
}
/** @return the count of active regionservers */
private int countOfRegionServers() {
public int countOfRegionServers() {
// Presumes onlineServers is a concurrent map
return this.onlineServers.size();
}

View File

@ -89,8 +89,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
* topology in terms of server names, hostnames and racks.
*/
protected static class Cluster {
ServerName masterServerName;
Set<String> tablesOnMaster;
ServerName[] servers;
String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
String[] racks;
@ -99,7 +97,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
ArrayList<String> tables;
HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads;
int activeMasterIndex = -1;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@ -120,7 +117,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int numUserRegionsOnMaster; //number of user regions on the active master
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas
@ -140,40 +136,32 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numMovedRegions = 0; //num moved regions from the initial configuration
// num of moved regions away from master that should be on the master
int numMovedMasterHostedRegions = 0;
int numMovedMetaRegions = 0; //num of moved regions that are META
Map<ServerName, List<HRegionInfo>> clusterState;
protected final RackManager rackManager;
protected Cluster(
ServerName masterServerName,
Map<ServerName, List<HRegionInfo>> clusterState,
Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder,
Set<String> tablesOnMaster,
RackManager rackManager) {
this(masterServerName, null, clusterState, loads, regionFinder,
tablesOnMaster, rackManager);
this(null, clusterState, loads, regionFinder,
rackManager);
}
@SuppressWarnings("unchecked")
protected Cluster(
ServerName masterServerName,
Collection<HRegionInfo> unassignedRegions,
Map<ServerName, List<HRegionInfo>> clusterState,
Map<String, Deque<RegionLoad>> loads,
RegionLocationFinder regionFinder,
Set<String> tablesOnMaster,
RackManager rackManager) {
if (unassignedRegions == null) {
unassignedRegions = EMPTY_REGION_LIST;
}
this.masterServerName = masterServerName;
this.tablesOnMaster = tablesOnMaster;
serversToIndex = new HashMap<String, Integer>();
hostsToIndex = new HashMap<String, Integer>();
racksToIndex = new HashMap<String, Integer>();
@ -262,10 +250,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
if (servers[serverIndex].equals(masterServerName)) {
activeMasterIndex = serverIndex;
}
}
hosts = new String[numHosts];
@ -640,16 +624,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToServerIndex[region] = newServer;
if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; //region moved back to original location
if (shouldBeOnMaster(regions[region]) && isActiveMaster(newServer)) {
//Master hosted region moved back to the active master
numMovedMasterHostedRegions--;
}
} else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
numMovedRegions++; //region moved from original location
if (shouldBeOnMaster(regions[region]) && isActiveMaster(oldServer)) {
// Master hosted region moved away from active the master
numMovedMasterHostedRegions++;
}
}
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
@ -765,15 +741,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return regionsPerServer[server].length;
}
boolean isActiveMaster(int server) {
return activeMasterIndex == server;
}
boolean shouldBeOnMaster(HRegionInfo region) {
return tablesOnMaster != null && tablesOnMaster.contains(
region.getTable().getNameAsString());
}
boolean contains(int[] arr, int val) {
return Arrays.binarySearch(arr, val) >= 0;
}
@ -813,8 +780,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
numTables +
", numMovedRegions=" +
numMovedRegions +
", numMovedMasterHostedRegions=" +
numMovedMasterHostedRegions +
'}';
return desc;
}
@ -887,7 +852,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
protected List<RegionPlan> balanceMasterRegions(
Map<ServerName, List<HRegionInfo>> clusterMap) {
if (services == null || clusterMap.size() <= 1) return null;
if (masterServerName == null
|| clusterMap == null || clusterMap.size() <= 1) return null;
List<RegionPlan> plans = null;
List<HRegionInfo> regions = clusterMap.get(masterServerName);
if (regions != null) {
@ -931,6 +897,27 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return plans;
}
/**
* Assign the regions that should be on master regionserver.
*/
protected Map<ServerName, List<HRegionInfo>> assignMasterRegions(
Collection<HRegionInfo> regions, List<ServerName> servers) {
if (servers == null || regions == null || regions.isEmpty()) {
return null;
}
Map<ServerName, List<HRegionInfo>> assignments
= new TreeMap<ServerName, List<HRegionInfo>>();
if (masterServerName != null && servers.contains(masterServerName)) {
assignments.put(masterServerName, new ArrayList<HRegionInfo>());
for (HRegionInfo region: regions) {
if (shouldBeOnMaster(region)) {
assignments.get(masterServerName).add(region);
}
}
}
return assignments;
}
@Override
public Configuration getConf() {
return this.config;
@ -954,8 +941,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
protected boolean needsBalance(Cluster c) {
ClusterLoadState cs = new ClusterLoadState(
masterServerName, c.clusterState);
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not running balancer because only " + cs.getNumServers()
@ -1016,8 +1002,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
metricsBalancer.incrMiscInvocations();
Map<ServerName, List<HRegionInfo>> assignments = assignMasterRegions(regions, servers);
if (assignments != null && !assignments.isEmpty()) {
servers = new ArrayList<ServerName>(servers);
// Guarantee not to put other regions on master
servers.remove(masterServerName);
List<HRegionInfo> masterRegions = assignments.get(masterServerName);
if (!masterRegions.isEmpty()) {
regions = new ArrayList<HRegionInfo>(regions);
for (HRegionInfo region: masterRegions) {
regions.remove(region);
}
}
}
if (regions == null || regions.isEmpty()) {
return null;
return assignments;
}
int numServers = servers == null ? 0 : servers.size();
@ -1031,29 +1030,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// generator for AssignRegionAction. The LB will ensure the regions are mostly local
// and balanced. This should also run fast with fewer number of iterations.
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
if (numServers == 1) { // Only one server, nothing fancy we can do here
ServerName server = servers.get(0);
assignments.put(server, new ArrayList<HRegionInfo>(regions));
return assignments;
}
List<HRegionInfo> masterRegions = null;
if (servers.contains(masterServerName)) {
masterRegions = new ArrayList<HRegionInfo>();
}
Cluster cluster = createCluster(servers, regions, tablesOnMaster);
Cluster cluster = createCluster(servers, regions);
List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
roundRobinAssignment(cluster, regions, unassignedRegions,
servers, masterRegions, assignments);
servers, assignments);
if (masterRegions != null && !masterRegions.isEmpty()) {
assignments.put(masterServerName, masterRegions);
for (HRegionInfo r : masterRegions) {
cluster.doAssignRegion(r, masterServerName);
}
}
List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
// assign the remaining by going through the list and try to assign to servers one-by-one
int serverIdx = RANDOM.nextInt(numServers);
@ -1061,9 +1049,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
boolean assigned = false;
for (int j = 0; j < numServers; j++) { // try all servers one by one
ServerName serverName = servers.get((j + serverIdx) % numServers);
if (serverName.equals(masterServerName)) {
continue;
}
if (!cluster.wouldLowerAvailability(region, serverName)) {
List<HRegionInfo> serverRegions = assignments.get(serverName);
if (serverRegions == null) {
@ -1086,11 +1071,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (HRegionInfo region : lastFewRegions) {
int i = RANDOM.nextInt(numServers);
ServerName server = servers.get(i);
if (server.equals(masterServerName)) {
// Try to avoid master for a user region
i = (i == 0 ? 1 : i - 1);
server = servers.get(i);
}
List<HRegionInfo> serverRegions = assignments.get(server);
if (serverRegions == null) {
serverRegions = new ArrayList<HRegionInfo>();
@ -1103,7 +1083,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
protected Cluster createCluster(List<ServerName> servers,
Collection<HRegionInfo> regions, Set<String> tablesOnMaster) {
Collection<HRegionInfo> regions) {
// Get the snapshot of the current assignments for the regions in question, and then create
// a cluster out of it. Note that we might have replicas already assigned to some servers
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
@ -1115,8 +1095,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
clusterState.put(server, EMPTY_REGION_LIST);
}
}
return new Cluster(masterServerName, regions, clusterState, null, this.regionFinder,
tablesOnMaster, rackManager);
return new Cluster(regions, clusterState, null, this.regionFinder,
rackManager);
}
/**
@ -1158,6 +1138,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
@Override
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
metricsBalancer.incrMiscInvocations();
if (servers != null && servers.contains(masterServerName)) {
if (shouldBeOnMaster(regionInfo)) {
return masterServerName;
}
servers = new ArrayList<ServerName>(servers);
// Guarantee not to put other regions on master
servers.remove(masterServerName);
}
int numServers = servers == null ? 0 : servers.size();
if (numServers == 0) {
LOG.warn("Wanted to do retain assignment but no servers to assign to");
@ -1166,13 +1155,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
if (numServers == 1) { // Only one server, nothing fancy we can do here
return servers.get(0);
}
if (shouldBeOnMaster(regionInfo)
&& servers.contains(masterServerName)) {
return masterServerName;
}
List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
Cluster cluster = createCluster(servers, regions, tablesOnMaster);
Cluster cluster = createCluster(servers, regions);
return randomAssignment(cluster, regionInfo, servers);
}
@ -1198,8 +1183,22 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
List<ServerName> servers) {
// Update metrics
metricsBalancer.incrMiscInvocations();
Map<ServerName, List<HRegionInfo>> assignments
= assignMasterRegions(regions.keySet(), servers);
if (assignments != null && !assignments.isEmpty()) {
servers = new ArrayList<ServerName>(servers);
// Guarantee not to put other regions on master
servers.remove(masterServerName);
List<HRegionInfo> masterRegions = assignments.get(masterServerName);
if (!masterRegions.isEmpty()) {
regions = new HashMap<HRegionInfo, ServerName>(regions);
for (HRegionInfo region: masterRegions) {
regions.remove(region);
}
}
}
if (regions == null || regions.isEmpty()) {
return null;
return assignments;
}
int numServers = servers == null ? 0 : servers.size();
@ -1207,7 +1206,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
LOG.warn("Wanted to do retain assignment but no servers to assign to");
return null;
}
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
if (numServers == 1) { // Only one server, nothing fancy we can do here
ServerName server = servers.get(0);
assignments.put(server, new ArrayList<HRegionInfo>(regions.keySet()));
@ -1223,9 +1221,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
for (ServerName server : servers) {
assignments.put(server, new ArrayList<HRegionInfo>());
if (!server.equals(masterServerName)) {
serversByHostname.put(server.getHostname(), server);
}
serversByHostname.put(server.getHostname(), server);
}
// Collection of the hostnames that used to have regions
@ -1233,13 +1229,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
// after the cluster restart.
Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
// Master regionserver is in the server list.
boolean masterIncluded = servers.contains(masterServerName);
int numRandomAssignments = 0;
int numRetainedAssigments = 0;
Cluster cluster = createCluster(servers, regions.keySet(), tablesOnMaster);
Cluster cluster = createCluster(servers, regions.keySet());
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
HRegionInfo region = entry.getKey();
@ -1248,14 +1241,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostname());
}
if (masterIncluded && shouldBeOnMaster(region)) {
assignments.get(masterServerName).add(region);
if (localServers.contains(masterServerName)) {
numRetainedAssigments++;
} else {
numRandomAssignments++;
}
} else if (localServers.isEmpty()) {
if (localServers.isEmpty()) {
// No servers on the new cluster match up with this hostname,
// assign randomly.
ServerName randomServer = randomAssignment(cluster, region, servers);
@ -1340,11 +1326,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
do {
int i = RANDOM.nextInt(numServers);
sn = servers.get(i);
if (sn.equals(masterServerName)) {
// Try to avoid master for a user region
i = (i == 0 ? 1 : i - 1);
sn = servers.get(i);
}
} while (cluster.wouldLowerAvailability(regionInfo, sn)
&& iterations++ < maxIterations);
cluster.doAssignRegion(regionInfo, sn);
@ -1356,16 +1337,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
private void roundRobinAssignment(Cluster cluster, List<HRegionInfo> regions,
List<HRegionInfo> unassignedRegions, List<ServerName> servers,
List<HRegionInfo> masterRegions, Map<ServerName, List<HRegionInfo>> assignments) {
Map<ServerName, List<HRegionInfo>> assignments) {
boolean masterIncluded = servers.contains(masterServerName);
int numServers = servers.size();
int skipServers = numServers;
if (masterIncluded) {
skipServers--;
}
int numRegions = regions.size();
int max = (int) Math.ceil((float) numRegions / skipServers);
int max = (int) Math.ceil((float) numRegions / numServers);
int serverIdx = 0;
if (numServers > 1) {
serverIdx = RANDOM.nextInt(numServers);
@ -1374,25 +1350,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
if (masterIncluded && server.equals(masterServerName)) {
// Don't put non-special region on the master regionserver,
// So that it is not overloaded.
continue;
}
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
for (int i = regionIdx; i < numRegions; i += skipServers) {
for (int i = regionIdx; i < numRegions; i += numServers) {
HRegionInfo region = regions.get(i % numRegions);
if (masterRegions == null || !shouldBeOnMaster(region)) {
if (cluster.wouldLowerAvailability(region, server)) {
unassignedRegions.add(region);
} else {
serverRegions.add(region);
cluster.doAssignRegion(region, server);
}
continue;
if (cluster.wouldLowerAvailability(region, server)) {
unassignedRegions.add(region);
} else {
serverRegions.add(region);
cluster.doAssignRegion(region, server);
}
// Master is in the list and this is a special region
masterRegions.add(region);
}
assignments.put(server, serverRegions);
regionIdx++;

View File

@ -35,19 +35,13 @@ public class ClusterLoadState {
private int numRegions = 0;
private int numServers = 0;
public ClusterLoadState(ServerName master,
Map<ServerName, List<HRegionInfo>> clusterState) {
public ClusterLoadState(Map<ServerName, List<HRegionInfo>> clusterState) {
this.numRegions = 0;
this.numServers = clusterState.size();
this.clusterState = clusterState;
serversByLoad = new TreeMap<ServerAndLoad, List<HRegionInfo>>();
// Iterate so we can count regions as we build the map
for (Map.Entry<ServerName, List<HRegionInfo>> server : clusterState.entrySet()) {
if (master != null && numServers > 1 && master.equals(server.getKey())) {
// Don't count the master since its load is meant to be low.
numServers--;
continue;
}
List<HRegionInfo> regions = server.getValue();
int sz = regions.size();
if (sz == 0) emptyRegionServerPresent = true;

View File

@ -183,19 +183,26 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
public List<RegionPlan> balanceCluster(
Map<ServerName, List<HRegionInfo>> clusterMap) {
List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
if (regionsToReturn != null) {
if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
return regionsToReturn;
}
if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
if (clusterMap.size() <= 2) {
return null;
}
clusterMap = new HashMap<ServerName, List<HRegionInfo>>(clusterMap);
clusterMap.remove(masterServerName);
}
boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
ClusterLoadState cs = new ClusterLoadState(masterServerName, clusterMap);
// construct a Cluster object with clusterMap and rest of the
// argument as defaults
Cluster c = new Cluster(masterServerName, clusterMap, null, this.regionFinder,
tablesOnMaster, this.rackManager);
Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
if (!this.needsBalance(c)) return null;
ClusterLoadState cs = new ClusterLoadState(clusterMap);
int numServers = cs.getNumServers();
NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
int numRegions = cs.getNumRegions();

View File

@ -208,14 +208,21 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
List<RegionPlan> plans = balanceMasterRegions(clusterState);
if (plans != null) {
if (plans != null || clusterState == null || clusterState.size() <= 1) {
return plans;
}
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
if (clusterState.size() <= 2) {
return null;
}
clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
clusterState.remove(masterServerName);
}
//The clusterState that is given to this method contains the state
//of all the regions in the table(s) (that's true today)
// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(masterServerName,
clusterState, loads, regionFinder, tablesOnMaster, rackManager);
Cluster cluster = new Cluster(clusterState, loads, regionFinder, rackManager);
if (!needsBalance(cluster)) {
return null;
}
@ -420,11 +427,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return -1;
}
int n = RANDOM.nextInt(cluster.numServers);
if (cluster.numServers > 1 && cluster.isActiveMaster(n)) {
n = (n + 1) % cluster.numServers;
}
return n;
return RANDOM.nextInt(cluster.numServers);
}
protected int pickRandomRack(Cluster cluster) {
@ -439,9 +442,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
if (cluster.numServers < 2) {
return -1;
}
if (cluster.activeMasterIndex != -1 && cluster.numServers == 2) {
return -1;
}
while (true) {
int otherServerIndex = pickRandomServer(cluster);
if (otherServerIndex != serverIndex) {
@ -530,8 +530,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = 0;
while (servers[index] == null || servers[index] == thisServer
|| cluster.isActiveMaster(index)) {
while (servers[index] == null || servers[index] == thisServer) {
index++;
if (index == servers.length) {
return -1;
@ -544,8 +543,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index = servers.length - 1;
while (servers[index] == null || servers[index] == thisServer
|| cluster.isActiveMaster(index)) {
while (servers[index] == null || servers[index] == thisServer) {
index--;
if (index < 0) {
return -1;
@ -801,9 +799,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
double total = getSum(stats);
double count = stats.length;
if (stats.length > 1 && cluster.activeMasterIndex != -1) {
count--; // Exclude the active master
}
double mean = total/count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
@ -824,12 +819,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
min = Math.max(0, min);
for (int i=0; i<stats.length; i++) {
if (stats.length > 1 && cluster.isActiveMaster(i)) {
// Not count the active master load
continue;
}
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
@ -897,11 +886,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
double moveCost = cluster.numMovedRegions;
// Don't let this single balance move more than the max moves,
// or move a region that should be on master away from the master.
// It is ok to move any master hosted region back to the master.
// Don't let this single balance move more than the max moves.
// This allows better scaling to accurately represent the actual cost of a move.
if (moveCost > maxMoves || cluster.numMovedMasterHostedRegions > 0) {
if (moveCost > maxMoves) {
return 1000000; // return a number much greater than any of the other cost
}

View File

@ -27,12 +27,12 @@ import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.ConfigUtil;
@ -135,9 +136,10 @@ public class ServerShutdownHandler extends EventHandler {
// we are not ready to assign dead regions either. So we re-queue up
// the dead server for further processing too.
AssignmentManager am = services.getAssignmentManager();
ServerManager serverManager = services.getServerManager();
if (isCarryingMeta() // hbase:meta
|| !am.isFailoverCleanupDone()) {
this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);
serverManager.processDeadServer(serverName, this.shouldSplitHlog);
return;
}
@ -160,6 +162,15 @@ public class ServerShutdownHandler extends EventHandler {
while (!this.server.isStopped()) {
try {
server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
if (BaseLoadBalancer.tablesOnMaster(server.getConfiguration())) {
while (!this.server.isStopped() && serverManager.countOfRegionServers() < 2) {
// Wait till at least another regionserver is up besides the active master
// so that we don't assign all regions to the active master.
// This is best of efforts, because newly joined regionserver
// could crash right after that.
Thread.sleep(100);
}
}
// Skip getting user regions if the server is stopped.
if (!this.server.isStopped()) {
if (ConfigUtil.useZKForAssignment(server.getConfiguration())) {

View File

@ -1106,11 +1106,8 @@ public class HRegionServer extends HasThread implements
}
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
if (sleep(200)) {
interrupted = true;
LOG.warn("Interrupted while sleeping");
}
}
} finally {
@ -1120,6 +1117,17 @@ public class HRegionServer extends HasThread implements
}
}
private boolean sleep(long millis) {
boolean interrupted = false;
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
interrupted = true;
}
return interrupted;
}
private void closeWAL(final boolean delete) {
if (this.hlogForMeta != null) {
// All hlogs (meta and non-meta) are in the same directory. Don't call
@ -1970,10 +1978,14 @@ public class HRegionServer extends HasThread implements
LOG.debug("No master found and cluster is stopped; bailing out");
return null;
}
LOG.debug("No master found; retry");
previousLogTime = System.currentTimeMillis();
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
LOG.debug("No master found; retry");
previousLogTime = System.currentTimeMillis();
}
refresh = true; // let's try pull it from ZK directly
sleeper.sleep();
if (sleep(200)) {
interrupted = true;
}
continue;
}
@ -1988,24 +2000,18 @@ public class HRegionServer extends HasThread implements
intf = RegionServerStatusService.newBlockingStub(channel);
break;
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
if (e instanceof ServerNotRunningYetException) {
if (System.currentTimeMillis() > (previousLogTime+1000)){
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
if (e instanceof ServerNotRunningYetException) {
LOG.info("Master isn't available yet, retrying");
previousLogTime = System.currentTimeMillis();
}
} else {
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
} else {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
previousLogTime = System.currentTimeMillis();
}
previousLogTime = System.currentTimeMillis();
}
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
if (sleep(200)) {
interrupted = true;
LOG.warn("Interrupted while sleeping");
}
}
}

View File

@ -216,8 +216,8 @@ public class BalancerTestBase {
}
protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) {
return new BaseLoadBalancer.Cluster(null,
mockClusterServers(mockCluster, -1), null, null, null, null);
return new BaseLoadBalancer.Cluster(
mockClusterServers(mockCluster, -1), null, null, null);
}
protected TreeMap<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster, int numTables) {

View File

@ -242,7 +242,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important). A similar thing applies to the region lists
Cluster cluster = new Cluster(master, clusterState, null, null, null, rackManager);
Cluster cluster = new Cluster(clusterState, null, null, rackManager);
// check whether a move of region1 from servers[0] to servers[1] would lower
// the availability of region1
assertTrue(cluster.wouldLowerAvailability(hri1, servers[1]));
@ -259,7 +259,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// now lets have servers[1] host replica_of_region2
list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1));
// create a new clusterState with the above change
cluster = new Cluster(master, clusterState, null, null, null, rackManager);
cluster = new Cluster(clusterState, null, null, rackManager);
// now check whether a move of a replica from servers[0] to servers[1] would lower
// the availability of region2
assertTrue(cluster.wouldLowerAvailability(hri3, servers[1]));
@ -271,14 +271,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[10], new ArrayList<HRegionInfo>()); //servers[10], rack3 hosts no region
// create a cluster with the above clusterState
cluster = new Cluster(master, clusterState, null, null, null, rackManager);
cluster = new Cluster(clusterState, null, null, rackManager);
// check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would
// lower the availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[0]));
// now create a cluster without the rack manager
cluster = new Cluster(master, clusterState, null, null, null, null);
cluster = new Cluster(clusterState, null, null, null);
// now repeat check whether a move of region1 from servers[0] to servers[6] would
// lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6]));
@ -311,7 +311,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important).
Cluster cluster = new Cluster(master, clusterState, null, null, null, rackManager);
Cluster cluster = new Cluster(clusterState, null, null, rackManager);
// check whether moving region1 from servers[1] to servers[2] would lower availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2]));
@ -331,7 +331,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2
// create a cluster with the above clusterState
cluster = new Cluster(master, clusterState, null, null, null, rackManager);
cluster = new Cluster(clusterState, null, null, rackManager);
// check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would
// lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0]));
@ -417,7 +417,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
assignRegions(regions, oldServers, clusterState);
// should not throw exception:
BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, null, null, null);
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, null, null);
assertEquals(101 + 9, cluster.numRegions);
assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port
}
@ -459,7 +459,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, locationFinder, null, null);
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test
int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1));

View File

@ -59,7 +59,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
private static StochasticLoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
private static Configuration conf;
private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L);
@BeforeClass
public static void beforeAllTests() throws Exception {
@ -331,7 +330,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
BaseLoadBalancer.Cluster cluster;
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
costFunction.init(cluster);
double costWithoutReplicas = costFunction.cost();
assertEquals(0, costWithoutReplicas, 0);
@ -341,7 +340,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
clusterState.firstEntry().getValue().get(0),1);
clusterState.lastEntry().getValue().add(replica1);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
costFunction.init(cluster);
double costWith1ReplicaDifferentServer = costFunction.cost();
@ -351,7 +350,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
HRegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2);
clusterState.lastEntry().getValue().add(replica2);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
costFunction.init(cluster);
double costWith1ReplicaSameServer = costFunction.cost();
@ -374,7 +373,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
entry.getValue().add(replica2);
it.next().getValue().add(replica3); //2nd server
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
costFunction.init(cluster);
double costWith3ReplicasSameServer = costFunction.cost();
@ -388,7 +387,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
clusterState.lastEntry().getValue().add(replica2);
clusterState.lastEntry().getValue().add(replica3);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null);
cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null);
costFunction.init(cluster);
double costWith2ReplicasOnTwoServers = costFunction.cost();
@ -408,7 +407,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
// until the step above s1 holds two replicas of a region
regions = randomRegions(1);
map.put(s2, regions);
assertTrue(loadBalancer.needsBalance(new Cluster(master, map, null, null, null, null)));
assertTrue(loadBalancer.needsBalance(new Cluster(map, null, null, null)));
// check for the case where there are two hosts on the same rack and there are two racks
// and both the replicas are on the same rack
map.clear();
@ -419,7 +418,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
map.put(s2, regionsOnS2);
// add another server so that the cluster has some host on another rack
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
assertTrue(loadBalancer.needsBalance(new Cluster(master, map, null, null, null,
assertTrue(loadBalancer.needsBalance(new Cluster(map, null, null,
new ForTestRackManagerOne())));
}