HBASE-16942 Add FavoredStochasticLoadBalancer and FN Candidate generators

Signed-off-by: Francis Liu <toffer@apache.org>
This commit is contained in:
Thiruvel Thirumoolan 2017-04-25 18:12:24 -07:00 committed by Francis Liu
parent 177344cdbf
commit 6bad35e728
15 changed files with 1842 additions and 51 deletions

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.favored;
import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -29,6 +31,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,7 +56,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Helper class for {@link FavoredNodeLoadBalancer} that has all the intelligence for racks,
@ -224,7 +226,7 @@ public class FavoredNodeAssignmentHelper {
// If there were fewer servers in one rack, say r3, which had 3 servers, one possible
// placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ...
// The regions should be distributed proportionately to the racksizes
void placePrimaryRSAsRoundRobin(Map<ServerName, List<HRegionInfo>> assignmentMap,
public void placePrimaryRSAsRoundRobin(Map<ServerName, List<HRegionInfo>> assignmentMap,
Map<HRegionInfo, ServerName> primaryRSMap, List<HRegionInfo> regions) {
List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
rackList.addAll(rackToRegionServerMap.keySet());
@ -236,9 +238,8 @@ public class FavoredNodeAssignmentHelper {
}
}
int numIterations = 0;
int firstServerIndex = random.nextInt(maxRackSize);
// Initialize the current processing host index.
int serverIndex = firstServerIndex;
int serverIndex = random.nextInt(maxRackSize);
for (HRegionInfo regionInfo : regions) {
List<ServerName> currentServerList;
String rackName;
@ -282,7 +283,7 @@ public class FavoredNodeAssignmentHelper {
}
}
Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryRS(
public Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryRS(
Map<HRegionInfo, ServerName> primaryRSMap) {
Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
for (Map.Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
@ -291,15 +292,7 @@ public class FavoredNodeAssignmentHelper {
ServerName primaryRS = entry.getValue();
try {
// Create the secondary and tertiary region server pair object.
ServerName[] favoredNodes;
// Get the rack for the primary region server
String primaryRack = getRackOfServer(primaryRS);
if (getTotalNumberOfRacks() == 1) {
favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
} else {
favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack);
}
ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS);
if (favoredNodes != null) {
secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
LOG.debug("Place the secondary and tertiary region server for region "
@ -314,6 +307,20 @@ public class FavoredNodeAssignmentHelper {
return secondaryAndTertiaryMap;
}
public ServerName[] getSecondaryAndTertiary(HRegionInfo regionInfo, ServerName primaryRS)
throws IOException {
ServerName[] favoredNodes;// Get the rack for the primary region server
String primaryRack = getRackOfServer(primaryRS);
if (getTotalNumberOfRacks() == 1) {
favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
} else {
favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack);
}
return favoredNodes;
}
private Map<ServerName, Set<HRegionInfo>> mapRSToPrimaries(
Map<HRegionInfo, ServerName> primaryRSMap) {
Map<ServerName, Set<HRegionInfo>> primaryServerMap = new HashMap<>();
@ -536,7 +543,7 @@ public class FavoredNodeAssignmentHelper {
return new ServerName[]{ secondaryRS, tertiaryRS };
}
boolean canPlaceFavoredNodes() {
public boolean canPlaceFavoredNodes() {
return (this.servers.size() >= FAVORED_NODES_NUM);
}
@ -554,8 +561,7 @@ public class FavoredNodeAssignmentHelper {
* @param rack rack from a server is needed
* @param skipServerSet the server shouldn't belong to this set
*/
protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet)
throws IOException {
protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) {
// Is the rack valid? Do we recognize it?
if (rack == null || getServersFromRack(rack) == null ||
@ -759,7 +765,7 @@ public class FavoredNodeAssignmentHelper {
* Choose a random server as primary and then choose secondary and tertiary FN so its spread
* across two racks.
*/
List<ServerName> generateFavoredNodes(HRegionInfo hri) throws IOException {
public List<ServerName> generateFavoredNodes(HRegionInfo hri) throws IOException {
List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
ServerName primary = servers.get(random.nextInt(servers.size()));
@ -780,6 +786,54 @@ public class FavoredNodeAssignmentHelper {
}
}
public Map<HRegionInfo, List<ServerName>> generateFavoredNodesRoundRobin(
Map<ServerName, List<HRegionInfo>> assignmentMap, List<HRegionInfo> regions)
throws IOException {
if (regions.size() > 0) {
if (canPlaceFavoredNodes()) {
Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<>();
// Lets try to have an equal distribution for primary favored node
placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
return generateFavoredNodes(primaryRSMap);
} else {
throw new HBaseIOException("Not enough nodes to generate favored nodes");
}
}
return null;
}
/*
* Generate favored nodes for a set of regions when we know where they are currently hosted.
*/
private Map<HRegionInfo, List<ServerName>> generateFavoredNodes(
Map<HRegionInfo, ServerName> primaryRSMap) {
Map<HRegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>();
Map<HRegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
placeSecondaryAndTertiaryRS(primaryRSMap);
for (Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
HRegionInfo region = entry.getKey();
ServerName primarySN = entry.getValue();
favoredNodesForRegion.add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(),
NON_STARTCODE));
ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
if (secondaryAndTertiaryNodes != null) {
favoredNodesForRegion.add(ServerName.valueOf(
secondaryAndTertiaryNodes[0].getHostname(), secondaryAndTertiaryNodes[0].getPort(),
NON_STARTCODE));
favoredNodesForRegion.add(ServerName.valueOf(
secondaryAndTertiaryNodes[1].getHostname(), secondaryAndTertiaryNodes[1].getPort(),
NON_STARTCODE));
}
generatedFavNodes.put(region, favoredNodesForRegion);
}
return generatedFavNodes;
}
/*
* Get the rack of server from local mapping when present, saves lookup by the RackManager.
*/

View File

@ -158,7 +158,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
@Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<HRegionInfo>> assignmentMap;
try {
FavoredNodeAssignmentHelper assignmentHelper =
@ -201,7 +201,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
}
@Override
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
throws HBaseIOException {
try {
FavoredNodeAssignmentHelper assignmentHelper =
new FavoredNodeAssignmentHelper(servers, rackManager);

View File

@ -18,9 +18,11 @@
*/
package org.apache.hadoop.hbase.favored;
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
import java.io.IOException;
import java.util.ArrayList;
@ -28,6 +30,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,12 +40,14 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@ -66,6 +71,7 @@ public class FavoredNodesManager {
private Map<ServerName, List<HRegionInfo>> teritiaryRSToRegionMap;
private MasterServices masterServices;
private RackManager rackManager;
/**
* Datanode port to be used for Favored Nodes.
@ -78,6 +84,7 @@ public class FavoredNodesManager {
this.primaryRSToRegionMap = new HashMap<>();
this.secondaryRSToRegionMap = new HashMap<>();
this.teritiaryRSToRegionMap = new HashMap<>();
this.rackManager = new RackManager(masterServices.getConfiguration());
}
public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment)
@ -113,6 +120,22 @@ public class FavoredNodesManager {
return !regionInfo.isSystemTable();
}
/**
* Filter and return regions for which favored nodes is not applicable.
*
* @param regions - collection of regions
* @return set of regions for which favored nodes is not applicable
*/
public static Set<HRegionInfo> filterNonFNApplicableRegions(Collection<HRegionInfo> regions) {
Set<HRegionInfo> fnRegions = Sets.newHashSet();
for (HRegionInfo regionInfo : regions) {
if (!isFavoredNodeApplicable(regionInfo)) {
fnRegions.add(regionInfo);
}
}
return fnRegions;
}
/*
* This should only be used when sending FN information to the region servers. Instead of
* sending the region server port, we use the datanode port. This helps in centralizing the DN
@ -126,7 +149,7 @@ public class FavoredNodesManager {
List<ServerName> fnWithDNPort = Lists.newArrayList();
for (ServerName sn : getFavoredNodes(regionInfo)) {
fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort,
ServerName.NON_STARTCODE));
NON_STARTCODE));
}
return fnWithDNPort;
}
@ -152,19 +175,19 @@ public class FavoredNodesManager {
+ regionInfo.getRegionNameAsString() + " with " + servers);
}
if (servers.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
throw new IOException("At least " + FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
if (servers.size() != FAVORED_NODES_NUM) {
throw new IOException("At least " + FAVORED_NODES_NUM
+ " favored nodes should be present for region : " + regionInfo.getEncodedName()
+ " current FN servers:" + servers);
}
List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
for (ServerName sn : servers) {
if (sn.getStartcode() == ServerName.NON_STARTCODE) {
if (sn.getStartcode() == NON_STARTCODE) {
serversWithNoStartCodes.add(sn);
} else {
serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
ServerName.NON_STARTCODE));
NON_STARTCODE));
}
}
regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
@ -186,7 +209,7 @@ public class FavoredNodesManager {
private synchronized void addToReplicaLoad(HRegionInfo hri, List<ServerName> servers) {
ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(),
ServerName.NON_STARTCODE);
NON_STARTCODE);
List<HRegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
if (regionList == null) {
regionList = new ArrayList<>();
@ -195,7 +218,7 @@ public class FavoredNodesManager {
primaryRSToRegionMap.put(serverToUse, regionList);
serverToUse = ServerName
.valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), ServerName.NON_STARTCODE);
.valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE);
regionList = secondaryRSToRegionMap.get(serverToUse);
if (regionList == null) {
regionList = new ArrayList<>();
@ -204,7 +227,7 @@ public class FavoredNodesManager {
secondaryRSToRegionMap.put(serverToUse, regionList);
serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(),
ServerName.NON_STARTCODE);
NON_STARTCODE);
regionList = teritiaryRSToRegionMap.get(serverToUse);
if (regionList == null) {
regionList = new ArrayList<>();
@ -213,6 +236,39 @@ public class FavoredNodesManager {
teritiaryRSToRegionMap.put(serverToUse, regionList);
}
/*
* Get the replica count for the servers provided.
*
* For each server, replica count includes three counts for primary, secondary and tertiary.
* If a server is the primary favored node for 10 regions, secondary for 5 and tertiary
* for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is
* not a favored node for any region, the replica count would be [0, 0, 0].
*/
public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
Map<ServerName, List<Integer>> result = Maps.newHashMap();
for (ServerName sn : servers) {
ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE);
List<Integer> countList = Lists.newArrayList();
if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
} else {
countList.add(0);
}
if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
} else {
countList.add(0);
}
if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
} else {
countList.add(0);
}
result.put(sn, countList);
}
return result;
}
public synchronized void deleteFavoredNodesForRegions(Collection<HRegionInfo> regionInfoList) {
for (HRegionInfo hri : regionInfoList) {
List<ServerName> favNodes = getFavoredNodes(hri);
@ -230,4 +286,8 @@ public class FavoredNodesManager {
}
}
}
public RackManager getRackManager() {
return rackManager;
}
}

View File

@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.ServerName;
@InterfaceAudience.Private
public interface FavoredNodesPromoter {
/* Try and assign regions even if favored nodes are dead */
String FAVORED_ALWAYS_ASSIGN_REGIONS = "hbase.favored.assignment.always.assign";
void generateFavoredNodesForDaughter(List<ServerName> servers,
HRegionInfo parent, HRegionInfo hriA, HRegionInfo hriB) throws IOException;

View File

@ -1224,7 +1224,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
@Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) {
List<ServerName> servers) throws HBaseIOException {
metricsBalancer.incrMiscInvocations();
Map<ServerName, List<HRegionInfo>> assignments = assignMasterRegions(regions, servers);
if (assignments != null && !assignments.isEmpty()) {
@ -1335,7 +1335,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
* Used to assign a single region to a random server.
*/
@Override
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
throws HBaseIOException {
metricsBalancer.incrMiscInvocations();
if (servers != null && servers.contains(masterServerName)) {
if (shouldBeOnMaster(regionInfo)) {
@ -1384,7 +1385,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
List<ServerName> servers) {
List<ServerName> servers) throws HBaseIOException {
// Update metrics
metricsBalancer.incrMiscInvocations();
Map<ServerName, List<HRegionInfo>> assignments

View File

@ -0,0 +1,730 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
* assigns favored nodes for each region. There is a Primary RegionServer that hosts
* the region, and then there is Secondary and Tertiary RegionServers. Currently, the
* favored nodes information is used in creating HDFS files - the Primary RegionServer
* passes the primary, secondary, tertiary node addresses as hints to the
* DistributedFileSystem API for creating files on the filesystem. These nodes are
* treated as hints by the HDFS to place the blocks of the file. This alleviates the
* problem to do with reading from remote nodes (since we can make the Secondary
* RegionServer as the new Primary RegionServer) after a region is recovered. This
* should help provide consistent read latencies for the regions even when their
* primary region servers die. This provides two
* {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
*
*/
public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
FavoredNodesPromoter {
private static final Log LOG = LogFactory.getLog(FavoredStochasticBalancer.class);
private FavoredNodesManager fnm;
@Override
public void initialize() throws HBaseIOException {
configureGenerators();
super.initialize();
}
protected void configureGenerators() {
List<CandidateGenerator> fnPickers = new ArrayList<>(2);
fnPickers.add(new FavoredNodeLoadPicker());
fnPickers.add(new FavoredNodeLocalityPicker());
setCandidateGenerators(fnPickers);
}
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
fnm = masterServices.getFavoredNodesManager();
}
/*
* Round robin assignment: Segregate the regions into two types:
*
* 1. The regions that have favored node assignment where at least one of the favored node
* is still alive. In this case, try to adhere to the current favored nodes assignment as
* much as possible - i.e., if the current primary is gone, then make the secondary or
* tertiary as the new host for the region (based on their current load). Note that we don't
* change the favored node assignments here (even though one or more favored node is
* currently down). That will be done by the admin operations.
*
* 2. The regions that currently don't have favored node assignments. Generate favored nodes
* for them and then assign. Generate the primary fn in round robin fashion and generate
* secondary and tertiary as per favored nodes constraints.
*/
@Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
List<ServerName> servers) throws HBaseIOException {
metricsBalancer.incrMiscInvocations();
Set<HRegionInfo> regionSet = Sets.newHashSet(regions);
Map<ServerName, List<HRegionInfo>> assignmentMap = assignMasterRegions(regions, servers);
if (assignmentMap != null && !assignmentMap.isEmpty()) {
servers = new ArrayList<>(servers);
// Guarantee not to put other regions on master
servers.remove(masterServerName);
List<HRegionInfo> masterRegions = assignmentMap.get(masterServerName);
if (!masterRegions.isEmpty()) {
for (HRegionInfo region: masterRegions) {
regionSet.remove(region);
}
}
}
if (regionSet.isEmpty()) {
return assignmentMap;
}
try {
FavoredNodeAssignmentHelper helper =
new FavoredNodeAssignmentHelper(servers, fnm.getRackManager());
helper.initialize();
Set<HRegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
regionSet.removeAll(systemRegions);
// Assign all system regions
Map<ServerName, List<HRegionInfo>> systemAssignments =
super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers);
// Segregate favored and non-favored nodes regions and assign accordingly.
Pair<Map<ServerName,List<HRegionInfo>>, List<HRegionInfo>> segregatedRegions =
segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers);
Map<ServerName, List<HRegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
Map<ServerName, List<HRegionInfo>> regionsWithoutFN =
generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond());
// merge the assignment maps
mergeAssignmentMaps(assignmentMap, systemAssignments);
mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap);
mergeAssignmentMaps(assignmentMap, regionsWithoutFN);
} catch (Exception ex) {
throw new HBaseIOException("Encountered exception while doing favored-nodes assignment "
+ ex + " Falling back to regular assignment", ex);
}
return assignmentMap;
}
private void mergeAssignmentMaps(Map<ServerName, List<HRegionInfo>> assignmentMap,
Map<ServerName, List<HRegionInfo>> otherAssignments) {
if (otherAssignments == null || otherAssignments.isEmpty()) {
return;
}
for (Entry<ServerName, List<HRegionInfo>> entry : otherAssignments.entrySet()) {
ServerName sn = entry.getKey();
List<HRegionInfo> regionsList = entry.getValue();
if (assignmentMap.get(sn) == null) {
assignmentMap.put(sn, Lists.newArrayList(regionsList));
} else {
assignmentMap.get(sn).addAll(regionsList);
}
}
}
private Map<ServerName, List<HRegionInfo>> generateFNForRegionsWithoutFN(
FavoredNodeAssignmentHelper helper, List<HRegionInfo> regions) throws IOException {
Map<ServerName, List<HRegionInfo>> assignmentMap = Maps.newHashMap();
Map<HRegionInfo, List<ServerName>> regionsNoFNMap;
if (regions.size() > 0) {
regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions);
fnm.updateFavoredNodes(regionsNoFNMap);
}
return assignmentMap;
}
/*
* Return a pair - one with assignments when favored nodes are present and another with regions
* without favored nodes.
*/
private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
segregateRegionsAndAssignRegionsWithFavoredNodes(Collection<HRegionInfo> regions,
List<ServerName> onlineServers) throws HBaseIOException {
// Since we expect FN to be present most of the time, lets create map with same size
Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =
new HashMap<>(onlineServers.size());
List<HRegionInfo> regionsWithNoFavoredNodes = new ArrayList<>();
for (HRegionInfo region : regions) {
List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
ServerName primaryHost = null;
ServerName secondaryHost = null;
ServerName tertiaryHost = null;
if (favoredNodes != null && !favoredNodes.isEmpty()) {
for (ServerName s : favoredNodes) {
ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s);
if (serverWithLegitStartCode != null) {
FavoredNodesPlan.Position position =
FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
if (Position.PRIMARY.equals(position)) {
primaryHost = serverWithLegitStartCode;
} else if (Position.SECONDARY.equals(position)) {
secondaryHost = serverWithLegitStartCode;
} else if (Position.TERTIARY.equals(position)) {
tertiaryHost = serverWithLegitStartCode;
}
}
}
assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
secondaryHost, tertiaryHost);
} else {
regionsWithNoFavoredNodes.add(region);
}
}
return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
}
private void addRegionToMap(Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes,
HRegionInfo region, ServerName host) {
List<HRegionInfo> regionsOnServer;
if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
regionsOnServer = Lists.newArrayList();
assignmentMapForFavoredNodes.put(host, regionsOnServer);
}
regionsOnServer.add(region);
}
/*
* Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the
* ServerName with the correct start code from the list of provided servers.
*/
private ServerName getServerFromFavoredNode(List<ServerName> servers, ServerName fn) {
for (ServerName server : servers) {
if (ServerName.isSameHostnameAndPort(fn, server)) {
return server;
}
}
return null;
}
/*
* Assign the region to primary if its available. If both secondary and tertiary are available,
* assign to the host which has less load. Else assign to secondary or tertiary whichever is
* available (in that order).
*/
private void assignRegionToAvailableFavoredNode(
Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes, HRegionInfo region,
ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
if (primaryHost != null) {
addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
} else if (secondaryHost != null && tertiaryHost != null) {
// Assign the region to the one with a lower load (both have the desired hdfs blocks)
ServerName s;
ServerLoad tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
ServerLoad secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
if (secondaryLoad != null && tertiaryLoad != null) {
if (secondaryLoad.getLoad() < tertiaryLoad.getLoad()) {
s = secondaryHost;
} else {
s = tertiaryHost;
}
} else {
// We don't have one/more load, lets just choose a random node
s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost;
}
addRegionToMap(assignmentMapForFavoredNodes, region, s);
} else if (secondaryHost != null) {
addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
} else if (tertiaryHost != null) {
addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
} else {
// No favored nodes are online, lets assign to BOGUS server
addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME);
}
}
/*
* If we have favored nodes for a region, we will return one of the FN as destination. If
* favored nodes are not present for a region, we will generate and return one of the FN as
* destination. If we can't generate anything, lets fallback.
*/
@Override
public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
throws HBaseIOException {
if (servers != null && servers.contains(masterServerName)) {
if (shouldBeOnMaster(regionInfo)) {
metricsBalancer.incrMiscInvocations();
return masterServerName;
}
servers = new ArrayList<>(servers);
// Guarantee not to put other regions on master
servers.remove(masterServerName);
}
ServerName destination = null;
if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) {
return super.randomAssignment(regionInfo, servers);
}
metricsBalancer.incrMiscInvocations();
List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
if (favoredNodes == null || favoredNodes.isEmpty()) {
// Generate new favored nodes and return primary
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
helper.initialize();
try {
favoredNodes = helper.generateFavoredNodes(regionInfo);
updateFavoredNodesForRegion(regionInfo, favoredNodes);
} catch (IOException e) {
LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e);
throw new HBaseIOException(e);
}
}
List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
if (onlineServers.size() > 0) {
destination = onlineServers.get(RANDOM.nextInt(onlineServers.size()));
}
boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
if (destination == null && alwaysAssign) {
LOG.warn("Can't generate FN for region: " + regionInfo + " falling back");
destination = super.randomAssignment(regionInfo, servers);
}
return destination;
}
private void updateFavoredNodesForRegion(HRegionInfo regionInfo, List<ServerName> newFavoredNodes)
throws IOException {
Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
regionFNMap.put(regionInfo, newFavoredNodes);
fnm.updateFavoredNodes(regionFNMap);
}
/*
* Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
*/
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<HRegionInfo>> assignmentMap = Maps.newHashMap();
Map<ServerName, List<HRegionInfo>> result = super.retainAssignment(regions, servers);
if (result == null || result.isEmpty()) {
LOG.warn("Nothing to assign to, probably no servers or no regions");
return null;
}
// Guarantee not to put other regions on master
if (servers != null && servers.contains(masterServerName)) {
servers = new ArrayList<>(servers);
servers.remove(masterServerName);
}
// Lets check if favored nodes info is in META, if not generate now.
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
helper.initialize();
LOG.debug("Generating favored nodes for regions missing them.");
Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
try {
for (Entry<ServerName, List<HRegionInfo>> entry : result.entrySet()) {
ServerName sn = entry.getKey();
ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
for (HRegionInfo hri : entry.getValue()) {
if (FavoredNodesManager.isFavoredNodeApplicable(hri)) {
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) {
LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary);
ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary);
if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
List<ServerName> newFavoredNodes = Lists.newArrayList();
newFavoredNodes.add(primary);
newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
regionFNMap.put(hri, newFavoredNodes);
addRegionToMap(assignmentMap, hri, sn);
} else {
throw new HBaseIOException("Cannot generate secondary/tertiary FN for " + hri
+ " generated "
+ (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing"));
}
} else {
List<ServerName> onlineFN = getOnlineFavoredNodes(servers, favoredNodes);
if (onlineFN.isEmpty()) {
// All favored nodes are dead, lets assign it to BOGUS
addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME);
} else {
// Is primary not on FN? Less likely, but we can still take care of this.
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
addRegionToMap(assignmentMap, hri, sn);
} else {
ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size()));
LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
+ " current: " + sn + " moving to: " + destination);
addRegionToMap(assignmentMap, hri, destination);
}
}
}
} else {
addRegionToMap(assignmentMap, hri, sn);
}
}
}
if (!regionFNMap.isEmpty()) {
LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size());
fnm.updateFavoredNodes(regionFNMap);
}
} catch (IOException e) {
throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet());
}
return assignmentMap;
}
/*
* Return list of favored nodes that are online.
*/
private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
List<ServerName> serversWithoutStartCodes) {
if (serversWithoutStartCodes == null) {
return null;
} else {
List<ServerName> result = Lists.newArrayList();
for (ServerName sn : serversWithoutStartCodes) {
for (ServerName online : onlineServers) {
if (ServerName.isSameHostnameAndPort(sn, online)) {
result.add(online);
}
}
}
return result;
}
}
/*
* Generate Favored Nodes for daughters during region split.
*
* If the parent does not have FN, regenerates them for the daughters.
*
* If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
* The primary FN for both the daughters should be the same as parent. Inherit the secondary
* FN from the parent but keep it different for each daughter. Choose the remaining FN
* randomly. This would give us better distribution over a period of time after enough splits.
*/
@Override
public void generateFavoredNodesForDaughter(List<ServerName> servers, HRegionInfo parent,
HRegionInfo regionA, HRegionInfo regionB) throws IOException {
Map<HRegionInfo, List<ServerName>> result = new HashMap<>();
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
helper.initialize();
List<ServerName> parentFavoredNodes = fnm.getFavoredNodes(parent);
if (parentFavoredNodes == null) {
LOG.debug("Unable to find favored nodes for parent, " + parent
+ " generating new favored nodes for daughter");
result.put(regionA, helper.generateFavoredNodes(regionA));
result.put(regionB, helper.generateFavoredNodes(regionB));
} else {
// Lets get the primary and secondary from parent for regionA
Set<ServerName> regionAFN =
getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
result.put(regionA, Lists.newArrayList(regionAFN));
// Lets get the primary and tertiary from parent for regionB
Set<ServerName> regionBFN =
getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
result.put(regionB, Lists.newArrayList(regionBFN));
}
fnm.updateFavoredNodes(result);
}
private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
List<ServerName> parentFavoredNodes, Position primary, Position secondary)
throws IOException {
Set<ServerName> daughterFN = Sets.newLinkedHashSet();
if (parentFavoredNodes.size() >= primary.ordinal()) {
daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
}
if (parentFavoredNodes.size() >= secondary.ordinal()) {
daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
}
while (daughterFN.size() < FAVORED_NODES_NUM) {
ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
daughterFN.add(newNode);
}
return daughterFN;
}
/*
* Generate favored nodes for a region during merge. Choose the FN from one of the sources to
* keep it simple.
*/
@Override
public void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo regionA,
HRegionInfo regionB) throws IOException {
updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(regionA));
}
/*
* Pick favored nodes with the highest locality for a region with lowest locality.
*/
private class FavoredNodeLocalityPicker extends CandidateGenerator {
@Override
protected Cluster.Action generate(Cluster cluster) {
int thisServer = pickRandomServer(cluster);
int thisRegion;
if (thisServer == -1) {
LOG.trace("Could not pick lowest local region server");
return Cluster.NullAction;
} else {
// Pick lowest local region on this server
thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
}
if (thisRegion == -1) {
if (cluster.regionsPerServer[thisServer].length > 0) {
LOG.trace("Could not pick lowest local region even when region server held "
+ cluster.regionsPerServer[thisServer].length + " regions");
}
return Cluster.NullAction;
}
HRegionInfo hri = cluster.regions[thisRegion];
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
int otherServer;
if (favoredNodes == null) {
if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
otherServer = pickOtherRandomServer(cluster, thisServer);
} else {
// No FN, ignore
LOG.trace("Ignoring, no favored nodes for region: " + hri);
return Cluster.NullAction;
}
} else {
// Pick other favored node with the highest locality
otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer);
}
return getAction(thisServer, thisRegion, otherServer, -1);
}
private int getDifferentFavoredNode(Cluster cluster, List<ServerName> favoredNodes,
int currentServer) {
List<Integer> fnIndex = new ArrayList<>();
for (ServerName sn : favoredNodes) {
if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
}
}
float locality = 0;
int highestLocalRSIndex = -1;
for (Integer index : fnIndex) {
if (index != currentServer) {
float temp = cluster.localityPerServer[index];
if (temp >= locality) {
locality = temp;
highestLocalRSIndex = index;
}
}
}
return highestLocalRSIndex;
}
private int pickLowestLocalRegionOnServer(Cluster cluster, int server) {
return cluster.getLowestLocalityRegionOnServer(server);
}
}
/*
* This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the
* most loaded server.
*/
class FavoredNodeLoadPicker extends CandidateGenerator {
@Override
Cluster.Action generate(Cluster cluster) {
cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster);
int thisRegion = pickRandomRegion(cluster, thisServer, 0);
HRegionInfo hri = cluster.regions[thisRegion];
int otherServer;
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
if (favoredNodes == null) {
if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
otherServer = pickLeastLoadedServer(cluster, thisServer);
} else {
return Cluster.NullAction;
}
} else {
otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
}
return getAction(thisServer, thisRegion, otherServer, -1);
}
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index;
for (index = 0; index < servers.length ; index++) {
if ((servers[index] != null) && servers[index] != thisServer) {
break;
}
}
return servers[index];
}
private int pickLeastLoadedFNServer(final Cluster cluster, List<ServerName> favoredNodes,
int currentServerIndex) {
List<Integer> fnIndex = new ArrayList<>();
for (ServerName sn : favoredNodes) {
if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
}
}
int leastLoadedFN = -1;
int load = Integer.MAX_VALUE;
for (Integer index : fnIndex) {
if (index != currentServerIndex) {
int temp = cluster.getNumRegions(index);
if (temp < load) {
load = temp;
leastLoadedFN = index;
}
}
}
return leastLoadedFN;
}
private int pickMostLoadedServer(final Cluster cluster) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
int index;
for (index = servers.length - 1; index > 0 ; index--) {
if (servers[index] != null) {
break;
}
}
return servers[index];
}
}
/*
* For all regions correctly assigned to favored nodes, we just use the stochastic balancer
* implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
*/
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
if (this.services != null) {
List<RegionPlan> regionPlans = Lists.newArrayList();
Map<ServerName, List<HRegionInfo>> correctAssignments = new HashMap<>();
int misplacedRegions = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
ServerName current = entry.getKey();
List<HRegionInfo> regions = Lists.newArrayList();
correctAssignments.put(current, regions);
for (HRegionInfo hri : entry.getValue()) {
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null ||
!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
regions.add(hri);
} else {
// No favored nodes, lets unassign.
LOG.warn("Region not on favored nodes, unassign. Region: " + hri
+ " current: " + current + " favored nodes: " + favoredNodes);
this.services.getAssignmentManager().unassign(hri);
RegionPlan rp = new RegionPlan(hri, null, null);
regionPlans.add(rp);
misplacedRegions++;
}
}
}
LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
if (regionPlansFromBalance != null) {
regionPlans.addAll(regionPlansFromBalance);
}
return regionPlans;
} else {
return super.balanceCluster(clusterState);
}
}
}

View File

@ -28,9 +28,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -103,6 +103,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"hbase.master.balancer.stochastic.stepsPerRegion";
protected static final String MAX_STEPS_KEY =
"hbase.master.balancer.stochastic.maxSteps";
protected static final String RUN_MAX_STEPS_KEY =
"hbase.master.balancer.stochastic.runMaxSteps";
protected static final String MAX_RUNNING_TIME_KEY =
"hbase.master.balancer.stochastic.maxRunningTime";
protected static final String KEEP_REGION_LOADS =
@ -111,19 +113,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected static final String MIN_COST_NEED_BALANCE_KEY =
"hbase.master.balancer.stochastic.minCostNeedBalance";
private static final Random RANDOM = new Random(System.currentTimeMillis());
protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
// values are defaults
private int maxSteps = 1000000;
private boolean runMaxSteps = false;
private int stepsPerRegion = 800;
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private CandidateGenerator[] candidateGenerators;
private List<CandidateGenerator> candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
@ -160,6 +163,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
@ -167,13 +172,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
}
localityCost = new LocalityCostFunction(conf, services);
if (candidateGenerators == null) {
candidateGenerators = new CandidateGenerator[] {
new RandomCandidateGenerator(),
new LoadCandidateGenerator(),
localityCandidateGenerator,
new RegionReplicaRackCandidateGenerator(),
};
if (this.candidateGenerators == null) {
candidateGenerators = Lists.newArrayList();
candidateGenerators.add(new RandomCandidateGenerator());
candidateGenerators.add(new LoadCandidateGenerator());
candidateGenerators.add(localityCandidateGenerator);
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
}
regionLoadFunctions = new CostFromRegionLoadFunction[] {
new ReadRequestCostFunction(conf),
@ -202,6 +207,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
}
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
this.candidateGenerators = customCandidateGenerators;
}
@Override
protected void setSlop(Configuration conf) {
this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
@ -352,14 +361,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
double initCost = currentCost;
double newCost = currentCost;
long computedMaxSteps = Math.min(this.maxSteps,
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
long computedMaxSteps;
if (runMaxSteps) {
computedMaxSteps = Math.max(this.maxSteps,
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
} else {
computedMaxSteps = Math.min(this.maxSteps,
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
}
// Perform a stochastic walk to see if we can get a good fit.
long step;
for (step = 0; step < computedMaxSteps; step++) {
int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
CandidateGenerator p = candidateGenerators[generatorIdx];
int generatorIdx = RANDOM.nextInt(candidateGenerators.size());
CandidateGenerator p = candidateGenerators.get(generatorIdx);
Cluster.Action action = p.generate(cluster);
if (action.type == Type.NULL) {

View File

@ -354,6 +354,35 @@ public class MiniHBaseCluster extends HBaseCluster {
return t;
}
/**
* Starts a region server thread and waits until its processed by master. Throws an exception
* when it can't start a region server or when the region server is not processed by master
* within the timeout.
*
* @return New RegionServerThread
* @throws IOException
*/
public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
throws IOException {
JVMClusterUtil.RegionServerThread t = startRegionServer();
ServerName rsServerName = t.getRegionServer().getServerName();
long start = System.currentTimeMillis();
ClusterStatus clusterStatus = getClusterStatus();
while ((System.currentTimeMillis() - start) < timeout) {
if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
return t;
}
Threads.sleep(100);
}
if (t.getRegionServer().isOnline()) {
throw new IOException("RS: " + rsServerName + " online, but not processed by master");
} else {
throw new IOException("RS: " + rsServerName + " is offline");
}
}
/**
* Cause a region server to exit doing basic clean up only on its way out.
* @param serverNumber Used as index into a list.

View File

@ -577,7 +577,7 @@ public class TestZooKeeper {
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
retainAssignCalled = true;
return super.retainAssignment(regions, servers);
}

View File

@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodeLoadBalancer;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadOnlyFavoredStochasticBalancer;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -90,7 +90,7 @@ public class TestTableFavoredNodes {
Configuration conf = TEST_UTIL.getConfiguration();
// Setting FavoredNodeBalancer will enable favored nodes
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
FavoredNodeLoadBalancer.class, LoadBalancer.class);
LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES);
// This helps test if RS get the appropriate FN updates.

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -1267,7 +1268,7 @@ public class TestAssignmentManagerOnCluster {
@Override
public ServerName randomAssignment(HRegionInfo regionInfo,
List<ServerName> servers) {
List<ServerName> servers) throws HBaseIOException {
if (regionInfo.equals(controledRegion)) {
return null;
}
@ -1276,7 +1277,7 @@ public class TestAssignmentManagerOnCluster {
@Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
List<HRegionInfo> regions, List<ServerName> servers) {
List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
if (countRegionServers != null && services != null) {
int regionServers = services.getServerManager().countOfRegionServers();
if (regionServers < countRegionServers.intValue()) {
@ -1296,7 +1297,7 @@ public class TestAssignmentManagerOnCluster {
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
for (HRegionInfo hri : regions.keySet()) {
if (hri.equals(controledRegion)) {
Map<ServerName, List<HRegionInfo>> m = Maps.newHashMap();

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.List;
import com.google.common.collect.Lists;
/**
* Used for FavoredNode unit tests
*/
public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer {
@Override
protected void configureGenerators() {
List<CandidateGenerator> fnPickers = Lists.newArrayList();
fnPickers.add(new FavoredNodeLoadPicker());
setCandidateGenerators(fnPickers);
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/*
* This case tests a scenario when a cluster with tables is moved from Stochastic Load Balancer
* to FavoredStochasticLoadBalancer and the generation of favored nodes after switch.
*/
@Category(MediumTests.class)
public class TestFavoredNodeTableImport {
private static final Log LOG = LogFactory.getLog(TestFavoredNodeTableImport.class);
private static final int SLAVES = 3;
private static final int REGION_NUM = 20;
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final Configuration conf = UTIL.getConfiguration();
@After
public void stopCluster() throws Exception {
UTIL.cleanupTestDir();
UTIL.shutdownMiniCluster();
}
@Test
public void testTableCreation() throws Exception {
conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, StochasticLoadBalancer.class.getName());
LOG.info("Starting up cluster");
UTIL.startMiniCluster(SLAVES);
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
Threads.sleep(1);
}
Admin admin = UTIL.getAdmin();
admin.setBalancerRunning(false, true);
String tableName = "testFNImport";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("a"), Bytes.toBytes("z"), REGION_NUM);
UTIL.waitTableAvailable(desc.getTableName());
LOG.info("Shutting down cluster");
UTIL.shutdownMiniHBaseCluster();
Thread.sleep(2000);
LOG.info("Starting cluster again with FN Balancer");
UTIL.getConfiguration().set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
FavoredStochasticBalancer.class.getName());
UTIL.restartHBaseCluster(SLAVES);
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
Threads.sleep(1);
}
admin = UTIL.getAdmin();
UTIL.waitTableAvailable(desc.getTableName());
FavoredNodesManager fnm = UTIL.getHBaseCluster().getMaster().getFavoredNodesManager();
List<HRegionInfo> regionsOfTable = admin.getTableRegions(TableName.valueOf(tableName));
for (HRegionInfo rInfo : regionsOfTable) {
Set<ServerName> favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo));
assertNotNull(favNodes);
assertEquals("Required no of favored nodes not found.", FAVORED_NODES_NUM, favNodes.size());
for (ServerName fn : favNodes) {
assertEquals("StartCode invalid for:" + fn, ServerName.NON_STARTCODE, fn.getStartcode());
}
}
}
}

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(LargeTests.class)
public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
private static final Log LOG = LogFactory.getLog(TestFavoredStochasticBalancerPickers.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int SLAVES = 6;
private static final int REGIONS = SLAVES * 3;
private static Configuration conf;
private Admin admin;
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
// Enable favored nodes based load balancer
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30000);
conf.setInt("hbase.master.balancer.stochastic.moveCost", 0);
conf.setBoolean("hbase.master.balancer.stochastic.execute.maxSteps", true);
conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
}
@Before
public void startCluster() throws Exception {
TEST_UTIL.startMiniCluster(SLAVES);
TEST_UTIL.getDFSCluster().waitClusterUp();
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(120*1000);
admin = TEST_UTIL.getAdmin();
admin.setBalancerRunning(false, true);
}
@After
public void stopCluster() throws Exception {
TEST_UTIL.cleanupTestDir();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testPickers() throws Exception {
TableName tableName = TableName.valueOf("testPickers");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGIONS);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName);
ServerName masterServerName = TEST_UTIL.getMiniHBaseCluster().getServerHoldingMeta();
final ServerName mostLoadedServer = getRSWithMaxRegions(Lists.newArrayList(masterServerName));
assertNotNull(mostLoadedServer);
int numRegions = admin.getOnlineRegions(mostLoadedServer).size();
ServerName source = getRSWithMaxRegions(Lists.newArrayList(masterServerName, mostLoadedServer));
assertNotNull(source);
int regionsToMove = admin.getOnlineRegions(source).size()/2;
List<HRegionInfo> hris = admin.getOnlineRegions(source);
for (int i = 0; i < regionsToMove; i++) {
admin.move(hris.get(i).getEncodedNameAsBytes(), Bytes.toBytes(mostLoadedServer.getServerName()));
LOG.info("Moving region: " + hris.get(i).getRegionNameAsString() + " to " + mostLoadedServer);
}
final int finalRegions = numRegions + regionsToMove;
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
int numRegions = TEST_UTIL.getAdmin().getOnlineRegions(mostLoadedServer).size();
return (numRegions == finalRegions);
}
});
TEST_UTIL.getHBaseCluster().startRegionServerAndWait(60000);
Map<ServerName, List<HRegionInfo>> serverAssignments = Maps.newHashMap();
ClusterStatus status = admin.getClusterStatus();
for (ServerName sn : status.getServers()) {
if (!ServerName.isSameHostnameAndPort(sn, masterServerName)) {
serverAssignments.put(sn, admin.getOnlineRegions(sn));
}
}
RegionLocationFinder regionFinder = new RegionLocationFinder();
regionFinder.setClusterStatus(admin.getClusterStatus());
regionFinder.setConf(conf);
regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster());
Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf));
LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL
.getMiniHBaseCluster().getMaster().getLoadBalancer();
FavoredNodesManager fnm = TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager();
cluster.sortServersByRegionCount();
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
LOG.info("Servers sorted by region count:" + Arrays.toString(servers));
LOG.info("Cluster dump: " + cluster);
if (!mostLoadedServer.equals(cluster.servers[servers[servers.length -1]])) {
LOG.error("Most loaded server: " + mostLoadedServer + " does not match: "
+ cluster.servers[servers[servers.length -1]]);
}
assertEquals(mostLoadedServer, cluster.servers[servers[servers.length -1]]);
FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker = balancer.new FavoredNodeLoadPicker();
boolean userRegionPicked = false;
for (int i = 0; i < 100; i++) {
if (userRegionPicked) {
break;
} else {
Cluster.Action action = loadPicker.generate(cluster);
if (action.type == Cluster.Action.Type.MOVE_REGION) {
Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action;
HRegionInfo region = cluster.regions[moveRegionAction.region];
assertNotEquals(-1, moveRegionAction.toServer);
ServerName destinationServer = cluster.servers[moveRegionAction.toServer];
assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer);
if (!region.getTable().isSystemTable()) {
List<ServerName> favNodes = fnm.getFavoredNodes(region);
assertTrue(favNodes.contains(ServerName.valueOf(destinationServer.getHostAndPort(), -1)));
userRegionPicked = true;
}
}
}
}
assertTrue("load picker did not pick expected regions in 100 iterations.", userRegionPicked);
}
private ServerName getRSWithMaxRegions(ArrayList<ServerName> excludeNodes) throws IOException {
int maxRegions = 0;
ServerName maxLoadedServer = null;
for (ServerName sn : admin.getClusterStatus().getServers()) {
if (admin.getOnlineRegions(sn).size() > maxRegions) {
if (excludeNodes == null || !doesMatchExcludeNodes(excludeNodes, sn)) {
maxRegions = admin.getOnlineRegions(sn).size();
maxLoadedServer = sn;
}
}
}
return maxLoadedServer;
}
private boolean doesMatchExcludeNodes(ArrayList<ServerName> excludeNodes, ServerName sn) {
for (ServerName excludeSN : excludeNodes) {
if (ServerName.isSameHostnameAndPort(sn, excludeSN)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,544 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@Category(MediumTests.class)
public class TestFavoredStochasticLoadBalancer extends BalancerTestBase {
private static final Log LOG = LogFactory.getLog(TestFavoredStochasticLoadBalancer.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int SLAVES = 8;
private static final int REGION_NUM = SLAVES * 3;
private Admin admin;
private HMaster master;
private MiniHBaseCluster cluster;
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Enable the favored nodes based load balancer
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
}
@Before
public void startCluster() throws Exception {
TEST_UTIL.startMiniCluster(SLAVES);
TEST_UTIL.getDFSCluster().waitClusterUp();
cluster = TEST_UTIL.getMiniHBaseCluster();
master = TEST_UTIL.getMiniHBaseCluster().getMaster();
admin = TEST_UTIL.getAdmin();
admin.setBalancerRunning(false, true);
}
@After
public void stopCluster() throws Exception {
TEST_UTIL.cleanupTestDir();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testBasicBalance() throws Exception {
TableName tableName = TableName.valueOf("testBasicBalance");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName);
compactTable(tableName);
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServerAndWait(10000);
JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServerAndWait(10000);
// Now try to run balance, and verify no regions are moved to the 2 region servers recently
// started.
admin.setBalancerRunning(true, true);
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition(120000);
List<HRegionInfo> hris = admin.getOnlineRegions(rs1.getRegionServer().getServerName());
for (HRegionInfo hri : hris) {
assertFalse("New RS contains regions belonging to table: " + tableName,
hri.getTable().equals(tableName));
}
hris = admin.getOnlineRegions(rs2.getRegionServer().getServerName());
for (HRegionInfo hri : hris) {
assertFalse("New RS contains regions belonging to table: " + tableName,
hri.getTable().equals(tableName));
}
}
@Test
public void testRoundRobinAssignment() throws Exception {
TableName tableName = TableName.valueOf("testRoundRobinAssignment");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY);
admin.flush(tableName);
LoadBalancer balancer = master.getLoadBalancer();
List<HRegionInfo> regions = admin.getTableRegions(tableName);
regions.addAll(admin.getTableRegions(TableName.META_TABLE_NAME));
regions.addAll(admin.getTableRegions(TableName.NAMESPACE_TABLE_NAME));
List<ServerName> servers = Lists.newArrayList(admin.getClusterStatus().getServers());
Map<ServerName, List<HRegionInfo>> map = balancer.roundRobinAssignment(regions, servers);
for (List<HRegionInfo> regionInfos : map.values()) {
regions.removeAll(regionInfos);
}
assertEquals("No region should be missed by balancer", 0, regions.size());
}
@Test
public void testBasicRegionPlacementAndReplicaLoad() throws Exception {
String tableName = "testBasicRegionPlacement";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(desc.getTableName());
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<HRegionInfo> regionsOfTable = admin.getTableRegions(TableName.valueOf(tableName));
for (HRegionInfo rInfo : regionsOfTable) {
Set<ServerName> favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo));
assertNotNull(favNodes);
assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, favNodes.size());
}
Map<ServerName, List<Integer>> replicaLoadMap =
fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers()));
assertTrue("Not all replica load collected.",
admin.getClusterStatus().getServers().size() == replicaLoadMap.size());
for (Entry<ServerName, List<Integer>> entry : replicaLoadMap.entrySet()) {
assertTrue(entry.getValue().size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
assertTrue(entry.getValue().get(0) >= 0);
assertTrue(entry.getValue().get(1) >= 0);
assertTrue(entry.getValue().get(2) >= 0);
}
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
replicaLoadMap =
fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers()));
assertTrue("replica load found " + replicaLoadMap.size() + " instead of 0.",
replicaLoadMap.size() == admin.getClusterStatus().getServers().size());
}
@Test
public void testRandomAssignmentWithNoFavNodes() throws Exception {
final String tableName = "testRandomAssignmentWithNoFavNodes";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc);
TEST_UTIL.waitTableAvailable(desc.getTableName());
HRegionInfo hri = admin.getTableRegions(TableName.valueOf(tableName)).get(0);
FavoredNodesManager fnm = master.getFavoredNodesManager();
fnm.deleteFavoredNodesForRegions(Lists.newArrayList(hri));
assertNull("Favored nodes not found null after delete", fnm.getFavoredNodes(hri));
LoadBalancer balancer = master.getLoadBalancer();
ServerName destination = balancer.randomAssignment(hri, Lists.newArrayList(admin
.getClusterStatus().getServers()));
assertNotNull(destination);
List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
assertNotNull(favoredNodes);
boolean containsFN = false;
for (ServerName sn : favoredNodes) {
if (ServerName.isSameHostnameAndPort(destination, sn)) {
containsFN = true;
}
}
assertTrue("Destination server does not belong to favored nodes.", containsFN);
}
@Test
public void testBalancerWithoutFavoredNodes() throws Exception {
TableName tableName = TableName.valueOf("testBalancerWithoutFavoredNodes");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
final HRegionInfo region = admin.getTableRegions(tableName).get(0);
LOG.info("Region thats supposed to be in transition: " + region);
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<ServerName> currentFN = fnm.getFavoredNodes(region);
assertNotNull(currentFN);
fnm.deleteFavoredNodesForRegions(Lists.newArrayList(region));
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
admin.setBalancerRunning(true, true);
// Balancer should unassign the region
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition();
admin.assign(region.getEncodedNameAsBytes());
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
currentFN = fnm.getFavoredNodes(region);
assertNotNull(currentFN);
assertEquals("Expected number of FN not present",
FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, currentFN.size());
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
checkFavoredNodeAssignments(tableName, fnm, regionStates);
}
@Test
public void testMisplacedRegions() throws Exception {
TableName tableName = TableName.valueOf("testMisplacedRegions");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
final HRegionInfo misplacedRegion = admin.getTableRegions(tableName).get(0);
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<ServerName> currentFN = fnm.getFavoredNodes(misplacedRegion);
assertNotNull(currentFN);
List<ServerName> serversForNewFN = Lists.newArrayList();
for (ServerName sn : admin.getClusterStatus().getServers()) {
serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE));
}
for (ServerName sn : currentFN) {
serversForNewFN.remove(sn);
}
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf);
helper.initialize();
List<ServerName> newFavoredNodes = helper.generateFavoredNodes(misplacedRegion);
assertNotNull(newFavoredNodes);
assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size());
Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
regionFNMap.put(misplacedRegion, newFavoredNodes);
fnm.updateFavoredNodes(regionFNMap);
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
final ServerName current = regionStates.getRegionServerOfRegion(misplacedRegion);
assertNull("Misplaced region is still hosted on favored node, not expected.",
FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(misplacedRegion), current));
admin.setBalancerRunning(true, true);
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitFor(120000, 30000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ServerName host = regionStates.getRegionServerOfRegion(misplacedRegion);
return !ServerName.isSameHostnameAndPort(host, current);
}
});
checkFavoredNodeAssignments(tableName, fnm, regionStates);
}
@Test
public void test2FavoredNodesDead() throws Exception {
TableName tableName = TableName.valueOf("testAllFavoredNodesDead");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
final HRegionInfo region = admin.getTableRegions(tableName).get(0);
LOG.info("Region that's supposed to be in transition: " + region);
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<ServerName> currentFN = fnm.getFavoredNodes(region);
assertNotNull(currentFN);
List<ServerName> serversToStop = Lists.newArrayList(currentFN);
serversToStop.remove(currentFN.get(0));
// Lets kill 2 FN for the region. All regions should still be assigned
stopServersAndWaitUntilProcessed(serversToStop);
TEST_UTIL.waitUntilNoRegionsInTransition();
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionStates.getRegionState(region).isOpened();
}
});
assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size());
admin.setBalancerRunning(true, true);
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
checkFavoredNodeAssignments(tableName, fnm, regionStates);
}
@Test
public void testAllFavoredNodesDead() throws Exception {
TableName tableName = TableName.valueOf("testAllFavoredNodesDead");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
final HRegionInfo region = admin.getTableRegions(tableName).get(0);
LOG.info("Region that's supposed to be in transition: " + region);
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<ServerName> currentFN = fnm.getFavoredNodes(region);
assertNotNull(currentFN);
// Lets kill all the RS that are favored nodes for this region.
stopServersAndWaitUntilProcessed(currentFN);
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionStates.getRegionState(region).isFailedOpen();
}
});
assertTrue("Region: " + region + " should be RIT",
regionStates.getRegionState(region).isFailedOpen());
// Regenerate FN and assign, everything else should be fine
List<ServerName> serversForNewFN = Lists.newArrayList();
for (ServerName sn : admin.getClusterStatus().getServers()) {
serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE));
}
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf);
helper.initialize();
for (RegionState regionState : regionStates.getRegionsInTransition()) {
HRegionInfo regionInfo = regionState.getRegion();
List<ServerName> newFavoredNodes = helper.generateFavoredNodes(regionInfo);
assertNotNull(newFavoredNodes);
assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size());
LOG.info("Region: " + regionInfo.getEncodedName() + " FN: " + newFavoredNodes);
Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
regionFNMap.put(regionInfo, newFavoredNodes);
fnm.updateFavoredNodes(regionFNMap);
LOG.info("Assigning region: " + regionInfo.getEncodedName());
admin.assign(regionInfo.getEncodedNameAsBytes());
}
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size());
admin.setBalancerRunning(true, true);
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
checkFavoredNodeAssignments(tableName, fnm, regionStates);
}
@Test
public void testAllFavoredNodesDeadMasterRestarted() throws Exception {
TableName tableName = TableName.valueOf("testAllFavoredNodesDeadMasterRestarted");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM);
TEST_UTIL.waitTableAvailable(tableName);
final HRegionInfo region = admin.getTableRegions(tableName).get(0);
LOG.info("Region that's supposed to be in transition: " + region);
FavoredNodesManager fnm = master.getFavoredNodesManager();
List<ServerName> currentFN = fnm.getFavoredNodes(region);
assertNotNull(currentFN);
// Lets kill all the RS that are favored nodes for this region.
stopServersAndWaitUntilProcessed(currentFN);
RegionStates regionStatesBeforeMaster = master.getAssignmentManager().getRegionStates();
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionStatesBeforeMaster.getRegionState(region).isFailedOpen();
}
});
assertTrue("Region: " + region + " should be RIT",
regionStatesBeforeMaster.getRegionState(region).isFailedOpen());
List<HRegionInfo> rit = Lists.newArrayList();
for (RegionState regionState : regionStatesBeforeMaster.getRegionsInTransition()) {
HRegionInfo regionInfo = regionState.getRegion();
LOG.debug("Region in transition after stopping FN's: " + regionInfo);
rit.add(regionInfo);
assertTrue("Region: " + regionInfo + " should be RIT",
regionStatesBeforeMaster.getRegionState(regionInfo).isFailedOpen());
assertEquals("Region: " + regionInfo + " does not belong to table: " + tableName,
tableName, regionInfo.getTable());
}
Configuration conf = cluster.getConf();
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
SLAVES - FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
cluster.stopMaster(master.getServerName());
cluster.waitForMasterToStop(master.getServerName(), 60000);
cluster.startMaster();
cluster.waitForActiveAndReadyMaster();
master = cluster.getMaster();
fnm = master.getFavoredNodesManager();
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
assertTrue("Region: " + region + " should be RIT",
regionStates.getRegionState(region).isFailedOpen());
for (HRegionInfo regionInfo : rit) {
assertTrue("Region: " + regionInfo + " should be RIT",
regionStates.getRegionState(regionInfo).isFailedOpen());
}
// Regenerate FN and assign, everything else should be fine
List<ServerName> serversForNewFN = Lists.newArrayList();
for (ServerName sn : admin.getClusterStatus().getServers()) {
serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE));
}
FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf);
helper.initialize();
for (HRegionInfo regionInfo : rit) {
List<ServerName> newFavoredNodes = helper.generateFavoredNodes(regionInfo);
assertNotNull(newFavoredNodes);
assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size());
LOG.info("Region: " + regionInfo.getEncodedName() + " FN: " + newFavoredNodes);
Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
regionFNMap.put(regionInfo, newFavoredNodes);
fnm.updateFavoredNodes(regionFNMap);
LOG.info("Assigning region: " + regionInfo.getEncodedName());
admin.assign(regionInfo.getEncodedNameAsBytes());
}
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size());
admin.setBalancerRunning(true, true);
assertTrue("Balancer did not run", admin.balancer());
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
checkFavoredNodeAssignments(tableName, fnm, regionStates);
}
private void checkFavoredNodeAssignments(TableName tableName, FavoredNodesManager fnm,
RegionStates regionStates) throws IOException {
for (HRegionInfo hri : admin.getTableRegions(tableName)) {
ServerName host = regionStates.getRegionServerOfRegion(hri);
assertNotNull("Region: " + hri.getEncodedName() + " not on FN, current: " + host
+ " FN list: " + fnm.getFavoredNodes(hri),
FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(hri), host));
}
}
private void stopServersAndWaitUntilProcessed(List<ServerName> currentFN) throws Exception {
for (ServerName sn : currentFN) {
for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
if (ServerName.isSameHostnameAndPort(sn, rst.getRegionServer().getServerName())) {
LOG.info("Shutting down server: " + sn);
cluster.stopRegionServer(rst.getRegionServer().getServerName());
cluster.waitForRegionServerToStop(rst.getRegionServer().getServerName(), 60000);
}
}
}
// Wait until dead servers are processed.
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !master.getServerManager().areDeadServersInProgress();
}
});
assertEquals("Not all servers killed",
SLAVES - currentFN.size(), cluster.getLiveRegionServerThreads().size());
}
private void compactTable(TableName tableName) throws IOException {
for(JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
for(Region region : t.getRegionServer().getOnlineRegions(tableName)) {
region.compact(true);
}
}
}
}