diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java index 48745caef2d..bdec8dd9741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.favored; +import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +31,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +56,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; /** * Helper class for {@link FavoredNodeLoadBalancer} that has all the intelligence for racks, @@ -224,7 +226,7 @@ public class FavoredNodeAssignmentHelper { // If there were fewer servers in one rack, say r3, which had 3 servers, one possible // placement could be r2:s5, , r4:s5, r1:s5, r2:s6, ... // The regions should be distributed proportionately to the racksizes - void placePrimaryRSAsRoundRobin(Map> assignmentMap, + public void placePrimaryRSAsRoundRobin(Map> assignmentMap, Map primaryRSMap, List regions) { List rackList = new ArrayList<>(rackToRegionServerMap.size()); rackList.addAll(rackToRegionServerMap.keySet()); @@ -236,9 +238,8 @@ public class FavoredNodeAssignmentHelper { } } int numIterations = 0; - int firstServerIndex = random.nextInt(maxRackSize); // Initialize the current processing host index. - int serverIndex = firstServerIndex; + int serverIndex = random.nextInt(maxRackSize); for (HRegionInfo regionInfo : regions) { List currentServerList; String rackName; @@ -282,7 +283,7 @@ public class FavoredNodeAssignmentHelper { } } - Map placeSecondaryAndTertiaryRS( + public Map placeSecondaryAndTertiaryRS( Map primaryRSMap) { Map secondaryAndTertiaryMap = new HashMap<>(); for (Map.Entry entry : primaryRSMap.entrySet()) { @@ -291,15 +292,7 @@ public class FavoredNodeAssignmentHelper { ServerName primaryRS = entry.getValue(); try { // Create the secondary and tertiary region server pair object. - ServerName[] favoredNodes; - // Get the rack for the primary region server - String primaryRack = getRackOfServer(primaryRS); - - if (getTotalNumberOfRacks() == 1) { - favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); - } else { - favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack); - } + ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS); if (favoredNodes != null) { secondaryAndTertiaryMap.put(regionInfo, favoredNodes); LOG.debug("Place the secondary and tertiary region server for region " @@ -314,6 +307,20 @@ public class FavoredNodeAssignmentHelper { return secondaryAndTertiaryMap; } + public ServerName[] getSecondaryAndTertiary(HRegionInfo regionInfo, ServerName primaryRS) + throws IOException { + + ServerName[] favoredNodes;// Get the rack for the primary region server + String primaryRack = getRackOfServer(primaryRS); + + if (getTotalNumberOfRacks() == 1) { + favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); + } else { + favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack); + } + return favoredNodes; + } + private Map> mapRSToPrimaries( Map primaryRSMap) { Map> primaryServerMap = new HashMap<>(); @@ -536,7 +543,7 @@ public class FavoredNodeAssignmentHelper { return new ServerName[]{ secondaryRS, tertiaryRS }; } - boolean canPlaceFavoredNodes() { + public boolean canPlaceFavoredNodes() { return (this.servers.size() >= FAVORED_NODES_NUM); } @@ -554,8 +561,7 @@ public class FavoredNodeAssignmentHelper { * @param rack rack from a server is needed * @param skipServerSet the server shouldn't belong to this set */ - protected ServerName getOneRandomServer(String rack, Set skipServerSet) - throws IOException { + protected ServerName getOneRandomServer(String rack, Set skipServerSet) { // Is the rack valid? Do we recognize it? if (rack == null || getServersFromRack(rack) == null || @@ -759,7 +765,7 @@ public class FavoredNodeAssignmentHelper { * Choose a random server as primary and then choose secondary and tertiary FN so its spread * across two racks. */ - List generateFavoredNodes(HRegionInfo hri) throws IOException { + public List generateFavoredNodes(HRegionInfo hri) throws IOException { List favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); ServerName primary = servers.get(random.nextInt(servers.size())); @@ -780,6 +786,54 @@ public class FavoredNodeAssignmentHelper { } } + public Map> generateFavoredNodesRoundRobin( + Map> assignmentMap, List regions) + throws IOException { + + if (regions.size() > 0) { + if (canPlaceFavoredNodes()) { + Map primaryRSMap = new HashMap<>(); + // Lets try to have an equal distribution for primary favored node + placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); + return generateFavoredNodes(primaryRSMap); + + } else { + throw new HBaseIOException("Not enough nodes to generate favored nodes"); + } + } + return null; + } + + /* + * Generate favored nodes for a set of regions when we know where they are currently hosted. + */ + private Map> generateFavoredNodes( + Map primaryRSMap) { + + Map> generatedFavNodes = new HashMap<>(); + Map secondaryAndTertiaryRSMap = + placeSecondaryAndTertiaryRS(primaryRSMap); + + for (Entry entry : primaryRSMap.entrySet()) { + List favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); + HRegionInfo region = entry.getKey(); + ServerName primarySN = entry.getValue(); + favoredNodesForRegion.add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(), + NON_STARTCODE)); + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region); + if (secondaryAndTertiaryNodes != null) { + favoredNodesForRegion.add(ServerName.valueOf( + secondaryAndTertiaryNodes[0].getHostname(), secondaryAndTertiaryNodes[0].getPort(), + NON_STARTCODE)); + favoredNodesForRegion.add(ServerName.valueOf( + secondaryAndTertiaryNodes[1].getHostname(), secondaryAndTertiaryNodes[1].getPort(), + NON_STARTCODE)); + } + generatedFavNodes.put(region, favoredNodesForRegion); + } + return generatedFavNodes; + } + /* * Get the rack of server from local mapping when present, saves lookup by the RackManager. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java index 6e7bf0e4f5d..00a29b206f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java @@ -158,7 +158,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored @Override public Map> roundRobinAssignment(List regions, - List servers) { + List servers) throws HBaseIOException { Map> assignmentMap; try { FavoredNodeAssignmentHelper assignmentHelper = @@ -201,7 +201,8 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored } @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { try { FavoredNodeAssignmentHelper assignmentHelper = new FavoredNodeAssignmentHelper(servers, rackManager); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java index 7aef70b1e9e..be4aad5e7e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.favored; +import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; +import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; import java.io.IOException; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,12 +40,14 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.net.NetUtils; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** @@ -66,6 +71,7 @@ public class FavoredNodesManager { private Map> teritiaryRSToRegionMap; private MasterServices masterServices; + private RackManager rackManager; /** * Datanode port to be used for Favored Nodes. @@ -78,6 +84,7 @@ public class FavoredNodesManager { this.primaryRSToRegionMap = new HashMap<>(); this.secondaryRSToRegionMap = new HashMap<>(); this.teritiaryRSToRegionMap = new HashMap<>(); + this.rackManager = new RackManager(masterServices.getConfiguration()); } public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment) @@ -113,6 +120,22 @@ public class FavoredNodesManager { return !regionInfo.isSystemTable(); } + /** + * Filter and return regions for which favored nodes is not applicable. + * + * @param regions - collection of regions + * @return set of regions for which favored nodes is not applicable + */ + public static Set filterNonFNApplicableRegions(Collection regions) { + Set fnRegions = Sets.newHashSet(); + for (HRegionInfo regionInfo : regions) { + if (!isFavoredNodeApplicable(regionInfo)) { + fnRegions.add(regionInfo); + } + } + return fnRegions; + } + /* * This should only be used when sending FN information to the region servers. Instead of * sending the region server port, we use the datanode port. This helps in centralizing the DN @@ -126,7 +149,7 @@ public class FavoredNodesManager { List fnWithDNPort = Lists.newArrayList(); for (ServerName sn : getFavoredNodes(regionInfo)) { fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, - ServerName.NON_STARTCODE)); + NON_STARTCODE)); } return fnWithDNPort; } @@ -152,19 +175,19 @@ public class FavoredNodesManager { + regionInfo.getRegionNameAsString() + " with " + servers); } - if (servers.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { - throw new IOException("At least " + FavoredNodeAssignmentHelper.FAVORED_NODES_NUM + if (servers.size() != FAVORED_NODES_NUM) { + throw new IOException("At least " + FAVORED_NODES_NUM + " favored nodes should be present for region : " + regionInfo.getEncodedName() + " current FN servers:" + servers); } List serversWithNoStartCodes = Lists.newArrayList(); for (ServerName sn : servers) { - if (sn.getStartcode() == ServerName.NON_STARTCODE) { + if (sn.getStartcode() == NON_STARTCODE) { serversWithNoStartCodes.add(sn); } else { serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), - ServerName.NON_STARTCODE)); + NON_STARTCODE)); } } regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes); @@ -186,7 +209,7 @@ public class FavoredNodesManager { private synchronized void addToReplicaLoad(HRegionInfo hri, List servers) { ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(), - ServerName.NON_STARTCODE); + NON_STARTCODE); List regionList = primaryRSToRegionMap.get(serverToUse); if (regionList == null) { regionList = new ArrayList<>(); @@ -195,7 +218,7 @@ public class FavoredNodesManager { primaryRSToRegionMap.put(serverToUse, regionList); serverToUse = ServerName - .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), ServerName.NON_STARTCODE); + .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE); regionList = secondaryRSToRegionMap.get(serverToUse); if (regionList == null) { regionList = new ArrayList<>(); @@ -204,7 +227,7 @@ public class FavoredNodesManager { secondaryRSToRegionMap.put(serverToUse, regionList); serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(), - ServerName.NON_STARTCODE); + NON_STARTCODE); regionList = teritiaryRSToRegionMap.get(serverToUse); if (regionList == null) { regionList = new ArrayList<>(); @@ -213,6 +236,39 @@ public class FavoredNodesManager { teritiaryRSToRegionMap.put(serverToUse, regionList); } + /* + * Get the replica count for the servers provided. + * + * For each server, replica count includes three counts for primary, secondary and tertiary. + * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary + * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is + * not a favored node for any region, the replica count would be [0, 0, 0]. + */ + public synchronized Map> getReplicaLoad(List servers) { + Map> result = Maps.newHashMap(); + for (ServerName sn : servers) { + ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE); + List countList = Lists.newArrayList(); + if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + result.put(sn, countList); + } + return result; + } + public synchronized void deleteFavoredNodesForRegions(Collection regionInfoList) { for (HRegionInfo hri : regionInfoList) { List favNodes = getFavoredNodes(hri); @@ -230,4 +286,8 @@ public class FavoredNodesManager { } } } + + public RackManager getRackManager() { + return rackManager; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java index 90f29db4c7e..0201143d73a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java @@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.ServerName; @InterfaceAudience.Private public interface FavoredNodesPromoter { + /* Try and assign regions even if favored nodes are dead */ + String FAVORED_ALWAYS_ASSIGN_REGIONS = "hbase.favored.assignment.always.assign"; + void generateFavoredNodesForDaughter(List servers, HRegionInfo parent, HRegionInfo hriA, HRegionInfo hriB) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0f1b1a2da93..196e693a986 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1224,7 +1224,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { */ @Override public Map> roundRobinAssignment(List regions, - List servers) { + List servers) throws HBaseIOException { metricsBalancer.incrMiscInvocations(); Map> assignments = assignMasterRegions(regions, servers); if (assignments != null && !assignments.isEmpty()) { @@ -1335,7 +1335,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * Used to assign a single region to a random server. */ @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { metricsBalancer.incrMiscInvocations(); if (servers != null && servers.contains(masterServerName)) { if (shouldBeOnMaster(regionInfo)) { @@ -1384,7 +1385,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { */ @Override public Map> retainAssignment(Map regions, - List servers) { + List servers) throws HBaseIOException { // Update metrics metricsBalancer.incrMiscInvocations(); Map> assignments diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java new file mode 100644 index 00000000000..fd98c9cc7bf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -0,0 +1,730 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; +import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.favored.FavoredNodesPlan; +import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position; +import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that + * assigns favored nodes for each region. There is a Primary RegionServer that hosts + * the region, and then there is Secondary and Tertiary RegionServers. Currently, the + * favored nodes information is used in creating HDFS files - the Primary RegionServer + * passes the primary, secondary, tertiary node addresses as hints to the + * DistributedFileSystem API for creating files on the filesystem. These nodes are + * treated as hints by the HDFS to place the blocks of the file. This alleviates the + * problem to do with reading from remote nodes (since we can make the Secondary + * RegionServer as the new Primary RegionServer) after a region is recovered. This + * should help provide consistent read latencies for the regions even when their + * primary region servers die. This provides two + * {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator} + * + */ +public class FavoredStochasticBalancer extends StochasticLoadBalancer implements + FavoredNodesPromoter { + + private static final Log LOG = LogFactory.getLog(FavoredStochasticBalancer.class); + private FavoredNodesManager fnm; + + @Override + public void initialize() throws HBaseIOException { + configureGenerators(); + super.initialize(); + } + + protected void configureGenerators() { + List fnPickers = new ArrayList<>(2); + fnPickers.add(new FavoredNodeLoadPicker()); + fnPickers.add(new FavoredNodeLocalityPicker()); + setCandidateGenerators(fnPickers); + } + + @Override + public void setMasterServices(MasterServices masterServices) { + super.setMasterServices(masterServices); + fnm = masterServices.getFavoredNodesManager(); + } + + /* + * Round robin assignment: Segregate the regions into two types: + * + * 1. The regions that have favored node assignment where at least one of the favored node + * is still alive. In this case, try to adhere to the current favored nodes assignment as + * much as possible - i.e., if the current primary is gone, then make the secondary or + * tertiary as the new host for the region (based on their current load). Note that we don't + * change the favored node assignments here (even though one or more favored node is + * currently down). That will be done by the admin operations. + * + * 2. The regions that currently don't have favored node assignments. Generate favored nodes + * for them and then assign. Generate the primary fn in round robin fashion and generate + * secondary and tertiary as per favored nodes constraints. + */ + @Override + public Map> roundRobinAssignment(List regions, + List servers) throws HBaseIOException { + + metricsBalancer.incrMiscInvocations(); + + Set regionSet = Sets.newHashSet(regions); + Map> assignmentMap = assignMasterRegions(regions, servers); + if (assignmentMap != null && !assignmentMap.isEmpty()) { + servers = new ArrayList<>(servers); + // Guarantee not to put other regions on master + servers.remove(masterServerName); + List masterRegions = assignmentMap.get(masterServerName); + if (!masterRegions.isEmpty()) { + for (HRegionInfo region: masterRegions) { + regionSet.remove(region); + } + } + } + + if (regionSet.isEmpty()) { + return assignmentMap; + } + + try { + FavoredNodeAssignmentHelper helper = + new FavoredNodeAssignmentHelper(servers, fnm.getRackManager()); + helper.initialize(); + + Set systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet); + regionSet.removeAll(systemRegions); + + // Assign all system regions + Map> systemAssignments = + super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers); + + // Segregate favored and non-favored nodes regions and assign accordingly. + Pair>, List> segregatedRegions = + segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers); + Map> regionsWithFavoredNodesMap = segregatedRegions.getFirst(); + Map> regionsWithoutFN = + generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond()); + + // merge the assignment maps + mergeAssignmentMaps(assignmentMap, systemAssignments); + mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap); + mergeAssignmentMaps(assignmentMap, regionsWithoutFN); + + } catch (Exception ex) { + throw new HBaseIOException("Encountered exception while doing favored-nodes assignment " + + ex + " Falling back to regular assignment", ex); + } + return assignmentMap; + } + + private void mergeAssignmentMaps(Map> assignmentMap, + Map> otherAssignments) { + + if (otherAssignments == null || otherAssignments.isEmpty()) { + return; + } + + for (Entry> entry : otherAssignments.entrySet()) { + ServerName sn = entry.getKey(); + List regionsList = entry.getValue(); + if (assignmentMap.get(sn) == null) { + assignmentMap.put(sn, Lists.newArrayList(regionsList)); + } else { + assignmentMap.get(sn).addAll(regionsList); + } + } + } + + private Map> generateFNForRegionsWithoutFN( + FavoredNodeAssignmentHelper helper, List regions) throws IOException { + + Map> assignmentMap = Maps.newHashMap(); + Map> regionsNoFNMap; + + if (regions.size() > 0) { + regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions); + fnm.updateFavoredNodes(regionsNoFNMap); + } + return assignmentMap; + } + + /* + * Return a pair - one with assignments when favored nodes are present and another with regions + * without favored nodes. + */ + private Pair>, List> + segregateRegionsAndAssignRegionsWithFavoredNodes(Collection regions, + List onlineServers) throws HBaseIOException { + + // Since we expect FN to be present most of the time, lets create map with same size + Map> assignmentMapForFavoredNodes = + new HashMap<>(onlineServers.size()); + List regionsWithNoFavoredNodes = new ArrayList<>(); + + for (HRegionInfo region : regions) { + List favoredNodes = fnm.getFavoredNodes(region); + ServerName primaryHost = null; + ServerName secondaryHost = null; + ServerName tertiaryHost = null; + + if (favoredNodes != null && !favoredNodes.isEmpty()) { + for (ServerName s : favoredNodes) { + ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s); + if (serverWithLegitStartCode != null) { + FavoredNodesPlan.Position position = + FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s); + if (Position.PRIMARY.equals(position)) { + primaryHost = serverWithLegitStartCode; + } else if (Position.SECONDARY.equals(position)) { + secondaryHost = serverWithLegitStartCode; + } else if (Position.TERTIARY.equals(position)) { + tertiaryHost = serverWithLegitStartCode; + } + } + } + assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost, + secondaryHost, tertiaryHost); + } else { + regionsWithNoFavoredNodes.add(region); + } + } + return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes); + } + + private void addRegionToMap(Map> assignmentMapForFavoredNodes, + HRegionInfo region, ServerName host) { + + List regionsOnServer; + if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) { + regionsOnServer = Lists.newArrayList(); + assignmentMapForFavoredNodes.put(host, regionsOnServer); + } + regionsOnServer.add(region); + } + + /* + * Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the + * ServerName with the correct start code from the list of provided servers. + */ + private ServerName getServerFromFavoredNode(List servers, ServerName fn) { + for (ServerName server : servers) { + if (ServerName.isSameHostnameAndPort(fn, server)) { + return server; + } + } + return null; + } + + /* + * Assign the region to primary if its available. If both secondary and tertiary are available, + * assign to the host which has less load. Else assign to secondary or tertiary whichever is + * available (in that order). + */ + private void assignRegionToAvailableFavoredNode( + Map> assignmentMapForFavoredNodes, HRegionInfo region, + ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) { + + if (primaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost); + + } else if (secondaryHost != null && tertiaryHost != null) { + + // Assign the region to the one with a lower load (both have the desired hdfs blocks) + ServerName s; + ServerLoad tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost); + ServerLoad secondaryLoad = super.services.getServerManager().getLoad(secondaryHost); + if (secondaryLoad != null && tertiaryLoad != null) { + if (secondaryLoad.getLoad() < tertiaryLoad.getLoad()) { + s = secondaryHost; + } else { + s = tertiaryHost; + } + } else { + // We don't have one/more load, lets just choose a random node + s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost; + } + addRegionToMap(assignmentMapForFavoredNodes, region, s); + } else if (secondaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost); + } else if (tertiaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost); + } else { + // No favored nodes are online, lets assign to BOGUS server + addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME); + } + } + + /* + * If we have favored nodes for a region, we will return one of the FN as destination. If + * favored nodes are not present for a region, we will generate and return one of the FN as + * destination. If we can't generate anything, lets fallback. + */ + @Override + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { + + if (servers != null && servers.contains(masterServerName)) { + if (shouldBeOnMaster(regionInfo)) { + metricsBalancer.incrMiscInvocations(); + return masterServerName; + } + servers = new ArrayList<>(servers); + // Guarantee not to put other regions on master + servers.remove(masterServerName); + } + + ServerName destination = null; + if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) { + return super.randomAssignment(regionInfo, servers); + } + + metricsBalancer.incrMiscInvocations(); + + List favoredNodes = fnm.getFavoredNodes(regionInfo); + if (favoredNodes == null || favoredNodes.isEmpty()) { + // Generate new favored nodes and return primary + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf()); + helper.initialize(); + try { + favoredNodes = helper.generateFavoredNodes(regionInfo); + updateFavoredNodesForRegion(regionInfo, favoredNodes); + + } catch (IOException e) { + LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e); + throw new HBaseIOException(e); + } + } + + List onlineServers = getOnlineFavoredNodes(servers, favoredNodes); + if (onlineServers.size() > 0) { + destination = onlineServers.get(RANDOM.nextInt(onlineServers.size())); + } + + boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true); + if (destination == null && alwaysAssign) { + LOG.warn("Can't generate FN for region: " + regionInfo + " falling back"); + destination = super.randomAssignment(regionInfo, servers); + } + return destination; + } + + private void updateFavoredNodesForRegion(HRegionInfo regionInfo, List newFavoredNodes) + throws IOException { + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(regionInfo, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + } + + /* + * Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing. + */ + @Override + public Map> retainAssignment(Map regions, + List servers) throws HBaseIOException { + + Map> assignmentMap = Maps.newHashMap(); + Map> result = super.retainAssignment(regions, servers); + if (result == null || result.isEmpty()) { + LOG.warn("Nothing to assign to, probably no servers or no regions"); + return null; + } + + // Guarantee not to put other regions on master + if (servers != null && servers.contains(masterServerName)) { + servers = new ArrayList<>(servers); + servers.remove(masterServerName); + } + + // Lets check if favored nodes info is in META, if not generate now. + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf()); + helper.initialize(); + + LOG.debug("Generating favored nodes for regions missing them."); + Map> regionFNMap = Maps.newHashMap(); + + try { + for (Entry> entry : result.entrySet()) { + + ServerName sn = entry.getKey(); + ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE); + + for (HRegionInfo hri : entry.getValue()) { + + if (FavoredNodesManager.isFavoredNodeApplicable(hri)) { + List favoredNodes = fnm.getFavoredNodes(hri); + if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) { + + LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary); + ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary); + if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) { + List newFavoredNodes = Lists.newArrayList(); + newFavoredNodes.add(primary); + newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(), + secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE)); + newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(), + secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE)); + regionFNMap.put(hri, newFavoredNodes); + addRegionToMap(assignmentMap, hri, sn); + + } else { + throw new HBaseIOException("Cannot generate secondary/tertiary FN for " + hri + + " generated " + + (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing")); + } + } else { + List onlineFN = getOnlineFavoredNodes(servers, favoredNodes); + if (onlineFN.isEmpty()) { + // All favored nodes are dead, lets assign it to BOGUS + addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME); + } else { + // Is primary not on FN? Less likely, but we can still take care of this. + if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) { + addRegionToMap(assignmentMap, hri, sn); + } else { + ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size())); + LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes + + " current: " + sn + " moving to: " + destination); + addRegionToMap(assignmentMap, hri, destination); + } + } + } + } else { + addRegionToMap(assignmentMap, hri, sn); + } + } + } + + if (!regionFNMap.isEmpty()) { + LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size()); + fnm.updateFavoredNodes(regionFNMap); + } + + } catch (IOException e) { + throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet()); + } + + return assignmentMap; + } + + /* + * Return list of favored nodes that are online. + */ + private List getOnlineFavoredNodes(List onlineServers, + List serversWithoutStartCodes) { + if (serversWithoutStartCodes == null) { + return null; + } else { + List result = Lists.newArrayList(); + for (ServerName sn : serversWithoutStartCodes) { + for (ServerName online : onlineServers) { + if (ServerName.isSameHostnameAndPort(sn, online)) { + result.add(online); + } + } + } + return result; + } + } + + /* + * Generate Favored Nodes for daughters during region split. + * + * If the parent does not have FN, regenerates them for the daughters. + * + * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining. + * The primary FN for both the daughters should be the same as parent. Inherit the secondary + * FN from the parent but keep it different for each daughter. Choose the remaining FN + * randomly. This would give us better distribution over a period of time after enough splits. + */ + @Override + public void generateFavoredNodesForDaughter(List servers, HRegionInfo parent, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + + Map> result = new HashMap<>(); + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager); + helper.initialize(); + + List parentFavoredNodes = fnm.getFavoredNodes(parent); + if (parentFavoredNodes == null) { + LOG.debug("Unable to find favored nodes for parent, " + parent + + " generating new favored nodes for daughter"); + result.put(regionA, helper.generateFavoredNodes(regionA)); + result.put(regionB, helper.generateFavoredNodes(regionB)); + + } else { + + // Lets get the primary and secondary from parent for regionA + Set regionAFN = + getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY); + result.put(regionA, Lists.newArrayList(regionAFN)); + + // Lets get the primary and tertiary from parent for regionB + Set regionBFN = + getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY); + result.put(regionB, Lists.newArrayList(regionBFN)); + } + + fnm.updateFavoredNodes(result); + } + + private Set getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper, + List parentFavoredNodes, Position primary, Position secondary) + throws IOException { + + Set daughterFN = Sets.newLinkedHashSet(); + if (parentFavoredNodes.size() >= primary.ordinal()) { + daughterFN.add(parentFavoredNodes.get(primary.ordinal())); + } + + if (parentFavoredNodes.size() >= secondary.ordinal()) { + daughterFN.add(parentFavoredNodes.get(secondary.ordinal())); + } + + while (daughterFN.size() < FAVORED_NODES_NUM) { + ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN)); + daughterFN.add(newNode); + } + return daughterFN; + } + + /* + * Generate favored nodes for a region during merge. Choose the FN from one of the sources to + * keep it simple. + */ + @Override + public void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo regionA, + HRegionInfo regionB) throws IOException { + updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(regionA)); + } + + /* + * Pick favored nodes with the highest locality for a region with lowest locality. + */ + private class FavoredNodeLocalityPicker extends CandidateGenerator { + + @Override + protected Cluster.Action generate(Cluster cluster) { + + int thisServer = pickRandomServer(cluster); + int thisRegion; + if (thisServer == -1) { + LOG.trace("Could not pick lowest local region server"); + return Cluster.NullAction; + } else { + // Pick lowest local region on this server + thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); + } + if (thisRegion == -1) { + if (cluster.regionsPerServer[thisServer].length > 0) { + LOG.trace("Could not pick lowest local region even when region server held " + + cluster.regionsPerServer[thisServer].length + " regions"); + } + return Cluster.NullAction; + } + + HRegionInfo hri = cluster.regions[thisRegion]; + List favoredNodes = fnm.getFavoredNodes(hri); + int otherServer; + if (favoredNodes == null) { + if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) { + otherServer = pickOtherRandomServer(cluster, thisServer); + } else { + // No FN, ignore + LOG.trace("Ignoring, no favored nodes for region: " + hri); + return Cluster.NullAction; + } + } else { + // Pick other favored node with the highest locality + otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer); + } + return getAction(thisServer, thisRegion, otherServer, -1); + } + + private int getDifferentFavoredNode(Cluster cluster, List favoredNodes, + int currentServer) { + List fnIndex = new ArrayList<>(); + for (ServerName sn : favoredNodes) { + if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) { + fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort())); + } + } + float locality = 0; + int highestLocalRSIndex = -1; + for (Integer index : fnIndex) { + if (index != currentServer) { + float temp = cluster.localityPerServer[index]; + if (temp >= locality) { + locality = temp; + highestLocalRSIndex = index; + } + } + } + return highestLocalRSIndex; + } + + private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { + return cluster.getLowestLocalityRegionOnServer(server); + } + } + + /* + * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the + * most loaded server. + */ + class FavoredNodeLoadPicker extends CandidateGenerator { + + @Override + Cluster.Action generate(Cluster cluster) { + cluster.sortServersByRegionCount(); + int thisServer = pickMostLoadedServer(cluster); + int thisRegion = pickRandomRegion(cluster, thisServer, 0); + HRegionInfo hri = cluster.regions[thisRegion]; + int otherServer; + List favoredNodes = fnm.getFavoredNodes(hri); + if (favoredNodes == null) { + if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) { + otherServer = pickLeastLoadedServer(cluster, thisServer); + } else { + return Cluster.NullAction; + } + } else { + otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer); + } + return getAction(thisServer, thisRegion, otherServer, -1); + } + + private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + int index; + for (index = 0; index < servers.length ; index++) { + if ((servers[index] != null) && servers[index] != thisServer) { + break; + } + } + return servers[index]; + } + + private int pickLeastLoadedFNServer(final Cluster cluster, List favoredNodes, + int currentServerIndex) { + List fnIndex = new ArrayList<>(); + for (ServerName sn : favoredNodes) { + if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) { + fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort())); + } + } + int leastLoadedFN = -1; + int load = Integer.MAX_VALUE; + for (Integer index : fnIndex) { + if (index != currentServerIndex) { + int temp = cluster.getNumRegions(index); + if (temp < load) { + load = temp; + leastLoadedFN = index; + } + } + } + return leastLoadedFN; + } + + private int pickMostLoadedServer(final Cluster cluster) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + int index; + for (index = servers.length - 1; index > 0 ; index--) { + if (servers[index] != null) { + break; + } + } + return servers[index]; + } + } + + /* + * For all regions correctly assigned to favored nodes, we just use the stochastic balancer + * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care. + */ + @Override + public List balanceCluster(Map> clusterState) { + + if (this.services != null) { + + List regionPlans = Lists.newArrayList(); + Map> correctAssignments = new HashMap<>(); + int misplacedRegions = 0; + + for (Entry> entry : clusterState.entrySet()) { + ServerName current = entry.getKey(); + List regions = Lists.newArrayList(); + correctAssignments.put(current, regions); + + for (HRegionInfo hri : entry.getValue()) { + List favoredNodes = fnm.getFavoredNodes(hri); + if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null || + !FavoredNodesManager.isFavoredNodeApplicable(hri)) { + regions.add(hri); + + } else { + // No favored nodes, lets unassign. + LOG.warn("Region not on favored nodes, unassign. Region: " + hri + + " current: " + current + " favored nodes: " + favoredNodes); + this.services.getAssignmentManager().unassign(hri); + RegionPlan rp = new RegionPlan(hri, null, null); + regionPlans.add(rp); + misplacedRegions++; + } + } + } + LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes."); + List regionPlansFromBalance = super.balanceCluster(correctAssignments); + if (regionPlansFromBalance != null) { + regionPlans.addAll(regionPlansFromBalance); + } + return regionPlans; + } else { + return super.balanceCluster(clusterState); + } + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 01058d85775..53db1f2ddab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -28,9 +28,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -103,6 +103,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "hbase.master.balancer.stochastic.stepsPerRegion"; protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; + protected static final String RUN_MAX_STEPS_KEY = + "hbase.master.balancer.stochastic.runMaxSteps"; protected static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime"; protected static final String KEEP_REGION_LOADS = @@ -111,19 +113,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { protected static final String MIN_COST_NEED_BALANCE_KEY = "hbase.master.balancer.stochastic.minCostNeedBalance"; - private static final Random RANDOM = new Random(System.currentTimeMillis()); + protected static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); Map> loads = new HashMap<>(); // values are defaults private int maxSteps = 1000000; + private boolean runMaxSteps = false; private int stepsPerRegion = 800; private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. private int numRegionLoadsToRemember = 15; private float minCostNeedBalance = 0.05f; - private CandidateGenerator[] candidateGenerators; + private List candidateGenerators; private CostFromRegionLoadFunction[] regionLoadFunctions; private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC @@ -160,6 +163,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); + runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps); + numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance); @@ -167,13 +172,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); } localityCost = new LocalityCostFunction(conf, services); - if (candidateGenerators == null) { - candidateGenerators = new CandidateGenerator[] { - new RandomCandidateGenerator(), - new LoadCandidateGenerator(), - localityCandidateGenerator, - new RegionReplicaRackCandidateGenerator(), - }; + + if (this.candidateGenerators == null) { + candidateGenerators = Lists.newArrayList(); + candidateGenerators.add(new RandomCandidateGenerator()); + candidateGenerators.add(new LoadCandidateGenerator()); + candidateGenerators.add(localityCandidateGenerator); + candidateGenerators.add(new RegionReplicaRackCandidateGenerator()); } regionLoadFunctions = new CostFromRegionLoadFunction[] { new ReadRequestCostFunction(conf), @@ -202,6 +207,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc."); } + protected void setCandidateGenerators(List customCandidateGenerators) { + this.candidateGenerators = customCandidateGenerators; + } + @Override protected void setSlop(Configuration conf) { this.slop = conf.getFloat("hbase.regions.slop", 0.001F); @@ -352,14 +361,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { double initCost = currentCost; double newCost = currentCost; - long computedMaxSteps = Math.min(this.maxSteps, - ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); + long computedMaxSteps; + if (runMaxSteps) { + computedMaxSteps = Math.max(this.maxSteps, + ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); + } else { + computedMaxSteps = Math.min(this.maxSteps, + ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); + } // Perform a stochastic walk to see if we can get a good fit. long step; for (step = 0; step < computedMaxSteps; step++) { - int generatorIdx = RANDOM.nextInt(candidateGenerators.length); - CandidateGenerator p = candidateGenerators[generatorIdx]; + int generatorIdx = RANDOM.nextInt(candidateGenerators.size()); + CandidateGenerator p = candidateGenerators.get(generatorIdx); Cluster.Action action = p.generate(cluster); if (action.type == Type.NULL) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index c0efc7be6f5..1044a180150 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -354,6 +354,35 @@ public class MiniHBaseCluster extends HBaseCluster { return t; } + /** + * Starts a region server thread and waits until its processed by master. Throws an exception + * when it can't start a region server or when the region server is not processed by master + * within the timeout. + * + * @return New RegionServerThread + * @throws IOException + */ + public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) + throws IOException { + + JVMClusterUtil.RegionServerThread t = startRegionServer(); + ServerName rsServerName = t.getRegionServer().getServerName(); + + long start = System.currentTimeMillis(); + ClusterStatus clusterStatus = getClusterStatus(); + while ((System.currentTimeMillis() - start) < timeout) { + if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { + return t; + } + Threads.sleep(100); + } + if (t.getRegionServer().isOnline()) { + throw new IOException("RS: " + rsServerName + " online, but not processed by master"); + } else { + throw new IOException("RS: " + rsServerName + " is offline"); + } + } + /** * Cause a region server to exit doing basic clean up only on its way out. * @param serverNumber Used as index into a list. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index f3faa415c98..461ff8c4b89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -577,7 +577,7 @@ public class TestZooKeeper { @Override public Map> retainAssignment( - Map regions, List servers) { + Map regions, List servers) throws HBaseIOException { retainAssignCalled = true; return super.retainAssignment(regions, servers); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java index f587d207f28..3eb65a503cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableFavoredNodes.java @@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; -import org.apache.hadoop.hbase.favored.FavoredNodeLoadBalancer; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; +import org.apache.hadoop.hbase.master.balancer.LoadOnlyFavoredStochasticBalancer; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -90,7 +90,7 @@ public class TestTableFavoredNodes { Configuration conf = TEST_UTIL.getConfiguration(); // Setting FavoredNodeBalancer will enable favored nodes conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, - FavoredNodeLoadBalancer.class, LoadBalancer.class); + LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class); conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES); // This helps test if RS get the appropriate FN updates. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 449e1e6ab08..23e61f6a729 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -1267,7 +1268,7 @@ public class TestAssignmentManagerOnCluster { @Override public ServerName randomAssignment(HRegionInfo regionInfo, - List servers) { + List servers) throws HBaseIOException { if (regionInfo.equals(controledRegion)) { return null; } @@ -1276,7 +1277,7 @@ public class TestAssignmentManagerOnCluster { @Override public Map> roundRobinAssignment( - List regions, List servers) { + List regions, List servers) throws HBaseIOException { if (countRegionServers != null && services != null) { int regionServers = services.getServerManager().countOfRegionServers(); if (regionServers < countRegionServers.intValue()) { @@ -1296,7 +1297,7 @@ public class TestAssignmentManagerOnCluster { @Override public Map> retainAssignment( - Map regions, List servers) { + Map regions, List servers) throws HBaseIOException { for (HRegionInfo hri : regions.keySet()) { if (hri.equals(controledRegion)) { Map> m = Maps.newHashMap(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java new file mode 100644 index 00000000000..276d65e8e89 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; + +import com.google.common.collect.Lists; + +/** + * Used for FavoredNode unit tests + */ +public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer { + + @Override + protected void configureGenerators() { + List fnPickers = Lists.newArrayList(); + fnPickers.add(new FavoredNodeLoadPicker()); + setCandidateGenerators(fnPickers); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java new file mode 100644 index 00000000000..a6ee897be3a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredNodeTableImport.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/* + * This case tests a scenario when a cluster with tables is moved from Stochastic Load Balancer + * to FavoredStochasticLoadBalancer and the generation of favored nodes after switch. + */ +@Category(MediumTests.class) +public class TestFavoredNodeTableImport { + + private static final Log LOG = LogFactory.getLog(TestFavoredNodeTableImport.class); + + private static final int SLAVES = 3; + private static final int REGION_NUM = 20; + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final Configuration conf = UTIL.getConfiguration(); + + @After + public void stopCluster() throws Exception { + UTIL.cleanupTestDir(); + UTIL.shutdownMiniCluster(); + } + + @Test + public void testTableCreation() throws Exception { + + conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, StochasticLoadBalancer.class.getName()); + + LOG.info("Starting up cluster"); + UTIL.startMiniCluster(SLAVES); + while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { + Threads.sleep(1); + } + Admin admin = UTIL.getAdmin(); + admin.setBalancerRunning(false, true); + + String tableName = "testFNImport"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("a"), Bytes.toBytes("z"), REGION_NUM); + UTIL.waitTableAvailable(desc.getTableName()); + + LOG.info("Shutting down cluster"); + UTIL.shutdownMiniHBaseCluster(); + + Thread.sleep(2000); + LOG.info("Starting cluster again with FN Balancer"); + UTIL.getConfiguration().set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + FavoredStochasticBalancer.class.getName()); + UTIL.restartHBaseCluster(SLAVES); + while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { + Threads.sleep(1); + } + admin = UTIL.getAdmin(); + + UTIL.waitTableAvailable(desc.getTableName()); + + FavoredNodesManager fnm = UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + + List regionsOfTable = admin.getTableRegions(TableName.valueOf(tableName)); + for (HRegionInfo rInfo : regionsOfTable) { + Set favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo)); + assertNotNull(favNodes); + assertEquals("Required no of favored nodes not found.", FAVORED_NODES_NUM, favNodes.size()); + for (ServerName fn : favNodes) { + assertEquals("StartCode invalid for:" + fn, ServerName.NON_STARTCODE, fn.getStartcode()); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java new file mode 100644 index 00000000000..f806472e6a8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestFavoredStochasticBalancerPickers extends BalancerTestBase { + + private static final Log LOG = LogFactory.getLog(TestFavoredStochasticBalancerPickers.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final int SLAVES = 6; + private static final int REGIONS = SLAVES * 3; + private static Configuration conf; + + private Admin admin; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + // Enable favored nodes based load balancer + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30000); + conf.setInt("hbase.master.balancer.stochastic.moveCost", 0); + conf.setBoolean("hbase.master.balancer.stochastic.execute.maxSteps", true); + conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); + } + + @Before + public void startCluster() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + TEST_UTIL.getDFSCluster().waitClusterUp(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(120*1000); + admin = TEST_UTIL.getAdmin(); + admin.setBalancerRunning(false, true); + } + + @After + public void stopCluster() throws Exception { + TEST_UTIL.cleanupTestDir(); + TEST_UTIL.shutdownMiniCluster(); + } + + + @Test + public void testPickers() throws Exception { + + TableName tableName = TableName.valueOf("testPickers"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGIONS); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + + ServerName masterServerName = TEST_UTIL.getMiniHBaseCluster().getServerHoldingMeta(); + final ServerName mostLoadedServer = getRSWithMaxRegions(Lists.newArrayList(masterServerName)); + assertNotNull(mostLoadedServer); + int numRegions = admin.getOnlineRegions(mostLoadedServer).size(); + ServerName source = getRSWithMaxRegions(Lists.newArrayList(masterServerName, mostLoadedServer)); + assertNotNull(source); + int regionsToMove = admin.getOnlineRegions(source).size()/2; + List hris = admin.getOnlineRegions(source); + for (int i = 0; i < regionsToMove; i++) { + admin.move(hris.get(i).getEncodedNameAsBytes(), Bytes.toBytes(mostLoadedServer.getServerName())); + LOG.info("Moving region: " + hris.get(i).getRegionNameAsString() + " to " + mostLoadedServer); + } + final int finalRegions = numRegions + regionsToMove; + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + int numRegions = TEST_UTIL.getAdmin().getOnlineRegions(mostLoadedServer).size(); + return (numRegions == finalRegions); + } + }); + TEST_UTIL.getHBaseCluster().startRegionServerAndWait(60000); + + Map> serverAssignments = Maps.newHashMap(); + ClusterStatus status = admin.getClusterStatus(); + for (ServerName sn : status.getServers()) { + if (!ServerName.isSameHostnameAndPort(sn, masterServerName)) { + serverAssignments.put(sn, admin.getOnlineRegions(sn)); + } + } + RegionLocationFinder regionFinder = new RegionLocationFinder(); + regionFinder.setClusterStatus(admin.getClusterStatus()); + regionFinder.setConf(conf); + regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); + Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf)); + LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL + .getMiniHBaseCluster().getMaster().getLoadBalancer(); + FavoredNodesManager fnm = TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager(); + cluster.sortServersByRegionCount(); + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + LOG.info("Servers sorted by region count:" + Arrays.toString(servers)); + LOG.info("Cluster dump: " + cluster); + if (!mostLoadedServer.equals(cluster.servers[servers[servers.length -1]])) { + LOG.error("Most loaded server: " + mostLoadedServer + " does not match: " + + cluster.servers[servers[servers.length -1]]); + } + assertEquals(mostLoadedServer, cluster.servers[servers[servers.length -1]]); + FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker = balancer.new FavoredNodeLoadPicker(); + boolean userRegionPicked = false; + for (int i = 0; i < 100; i++) { + if (userRegionPicked) { + break; + } else { + Cluster.Action action = loadPicker.generate(cluster); + if (action.type == Cluster.Action.Type.MOVE_REGION) { + Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action; + HRegionInfo region = cluster.regions[moveRegionAction.region]; + assertNotEquals(-1, moveRegionAction.toServer); + ServerName destinationServer = cluster.servers[moveRegionAction.toServer]; + assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer); + if (!region.getTable().isSystemTable()) { + List favNodes = fnm.getFavoredNodes(region); + assertTrue(favNodes.contains(ServerName.valueOf(destinationServer.getHostAndPort(), -1))); + userRegionPicked = true; + } + } + } + } + assertTrue("load picker did not pick expected regions in 100 iterations.", userRegionPicked); + } + + private ServerName getRSWithMaxRegions(ArrayList excludeNodes) throws IOException { + int maxRegions = 0; + ServerName maxLoadedServer = null; + + for (ServerName sn : admin.getClusterStatus().getServers()) { + if (admin.getOnlineRegions(sn).size() > maxRegions) { + if (excludeNodes == null || !doesMatchExcludeNodes(excludeNodes, sn)) { + maxRegions = admin.getOnlineRegions(sn).size(); + maxLoadedServer = sn; + } + } + } + return maxLoadedServer; + } + + private boolean doesMatchExcludeNodes(ArrayList excludeNodes, ServerName sn) { + for (ServerName excludeSN : excludeNodes) { + if (ServerName.isSameHostnameAndPort(sn, excludeSN)) { + return true; + } + } + return false; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java new file mode 100644 index 00000000000..31385675a50 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; +import org.apache.hadoop.hbase.favored.FavoredNodesPlan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +@Category(MediumTests.class) +public class TestFavoredStochasticLoadBalancer extends BalancerTestBase { + + private static final Log LOG = LogFactory.getLog(TestFavoredStochasticLoadBalancer.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final int SLAVES = 8; + private static final int REGION_NUM = SLAVES * 3; + + private Admin admin; + private HMaster master; + private MiniHBaseCluster cluster; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + // Enable the favored nodes based load balancer + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class); + } + + @Before + public void startCluster() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + TEST_UTIL.getDFSCluster().waitClusterUp(); + cluster = TEST_UTIL.getMiniHBaseCluster(); + master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + admin = TEST_UTIL.getAdmin(); + admin.setBalancerRunning(false, true); + } + + @After + public void stopCluster() throws Exception { + TEST_UTIL.cleanupTestDir(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBasicBalance() throws Exception { + + TableName tableName = TableName.valueOf("testBasicBalance"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + compactTable(tableName); + + JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServerAndWait(10000); + JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServerAndWait(10000); + + // Now try to run balance, and verify no regions are moved to the 2 region servers recently + // started. + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(120000); + + List hris = admin.getOnlineRegions(rs1.getRegionServer().getServerName()); + for (HRegionInfo hri : hris) { + assertFalse("New RS contains regions belonging to table: " + tableName, + hri.getTable().equals(tableName)); + } + hris = admin.getOnlineRegions(rs2.getRegionServer().getServerName()); + for (HRegionInfo hri : hris) { + assertFalse("New RS contains regions belonging to table: " + tableName, + hri.getTable().equals(tableName)); + } + } + + @Test + public void testRoundRobinAssignment() throws Exception { + + TableName tableName = TableName.valueOf("testRoundRobinAssignment"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + + LoadBalancer balancer = master.getLoadBalancer(); + List regions = admin.getTableRegions(tableName); + regions.addAll(admin.getTableRegions(TableName.META_TABLE_NAME)); + regions.addAll(admin.getTableRegions(TableName.NAMESPACE_TABLE_NAME)); + List servers = Lists.newArrayList(admin.getClusterStatus().getServers()); + Map> map = balancer.roundRobinAssignment(regions, servers); + for (List regionInfos : map.values()) { + regions.removeAll(regionInfos); + } + assertEquals("No region should be missed by balancer", 0, regions.size()); + } + + + @Test + public void testBasicRegionPlacementAndReplicaLoad() throws Exception { + + String tableName = "testBasicRegionPlacement"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(desc.getTableName()); + + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List regionsOfTable = admin.getTableRegions(TableName.valueOf(tableName)); + for (HRegionInfo rInfo : regionsOfTable) { + Set favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo)); + assertNotNull(favNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, favNodes.size()); + } + + Map> replicaLoadMap = + fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers())); + assertTrue("Not all replica load collected.", + admin.getClusterStatus().getServers().size() == replicaLoadMap.size()); + for (Entry> entry : replicaLoadMap.entrySet()) { + assertTrue(entry.getValue().size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); + assertTrue(entry.getValue().get(0) >= 0); + assertTrue(entry.getValue().get(1) >= 0); + assertTrue(entry.getValue().get(2) >= 0); + } + + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + replicaLoadMap = + fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers())); + assertTrue("replica load found " + replicaLoadMap.size() + " instead of 0.", + replicaLoadMap.size() == admin.getClusterStatus().getServers().size()); + } + + @Test + public void testRandomAssignmentWithNoFavNodes() throws Exception { + + final String tableName = "testRandomAssignmentWithNoFavNodes"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + TEST_UTIL.waitTableAvailable(desc.getTableName()); + + HRegionInfo hri = admin.getTableRegions(TableName.valueOf(tableName)).get(0); + + FavoredNodesManager fnm = master.getFavoredNodesManager(); + fnm.deleteFavoredNodesForRegions(Lists.newArrayList(hri)); + assertNull("Favored nodes not found null after delete", fnm.getFavoredNodes(hri)); + + LoadBalancer balancer = master.getLoadBalancer(); + ServerName destination = balancer.randomAssignment(hri, Lists.newArrayList(admin + .getClusterStatus().getServers())); + assertNotNull(destination); + List favoredNodes = fnm.getFavoredNodes(hri); + assertNotNull(favoredNodes); + boolean containsFN = false; + for (ServerName sn : favoredNodes) { + if (ServerName.isSameHostnameAndPort(destination, sn)) { + containsFN = true; + } + } + assertTrue("Destination server does not belong to favored nodes.", containsFN); + } + + @Test + public void testBalancerWithoutFavoredNodes() throws Exception { + + TableName tableName = TableName.valueOf("testBalancerWithoutFavoredNodes"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + + final HRegionInfo region = admin.getTableRegions(tableName).get(0); + LOG.info("Region thats supposed to be in transition: " + region); + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(region); + assertNotNull(currentFN); + + fnm.deleteFavoredNodesForRegions(Lists.newArrayList(region)); + + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + admin.setBalancerRunning(true, true); + + // Balancer should unassign the region + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(); + + admin.assign(region.getEncodedNameAsBytes()); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + + currentFN = fnm.getFavoredNodes(region); + assertNotNull(currentFN); + assertEquals("Expected number of FN not present", + FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, currentFN.size()); + + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + + checkFavoredNodeAssignments(tableName, fnm, regionStates); + } + + @Test + public void testMisplacedRegions() throws Exception { + + TableName tableName = TableName.valueOf("testMisplacedRegions"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + + final HRegionInfo misplacedRegion = admin.getTableRegions(tableName).get(0); + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(misplacedRegion); + assertNotNull(currentFN); + + List serversForNewFN = Lists.newArrayList(); + for (ServerName sn : admin.getClusterStatus().getServers()) { + serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE)); + } + for (ServerName sn : currentFN) { + serversForNewFN.remove(sn); + } + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf); + helper.initialize(); + List newFavoredNodes = helper.generateFavoredNodes(misplacedRegion); + assertNotNull(newFavoredNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size()); + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(misplacedRegion, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + final ServerName current = regionStates.getRegionServerOfRegion(misplacedRegion); + assertNull("Misplaced region is still hosted on favored node, not expected.", + FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(misplacedRegion), current)); + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitFor(120000, 30000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName host = regionStates.getRegionServerOfRegion(misplacedRegion); + return !ServerName.isSameHostnameAndPort(host, current); + } + }); + checkFavoredNodeAssignments(tableName, fnm, regionStates); + } + + @Test + public void test2FavoredNodesDead() throws Exception { + + TableName tableName = TableName.valueOf("testAllFavoredNodesDead"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + + final HRegionInfo region = admin.getTableRegions(tableName).get(0); + LOG.info("Region that's supposed to be in transition: " + region); + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(region); + assertNotNull(currentFN); + + List serversToStop = Lists.newArrayList(currentFN); + serversToStop.remove(currentFN.get(0)); + + // Lets kill 2 FN for the region. All regions should still be assigned + stopServersAndWaitUntilProcessed(serversToStop); + + TEST_UTIL.waitUntilNoRegionsInTransition(); + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return regionStates.getRegionState(region).isOpened(); + } + }); + + assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size()); + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + + checkFavoredNodeAssignments(tableName, fnm, regionStates); + } + + @Test + public void testAllFavoredNodesDead() throws Exception { + + TableName tableName = TableName.valueOf("testAllFavoredNodesDead"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + + final HRegionInfo region = admin.getTableRegions(tableName).get(0); + LOG.info("Region that's supposed to be in transition: " + region); + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(region); + assertNotNull(currentFN); + + // Lets kill all the RS that are favored nodes for this region. + stopServersAndWaitUntilProcessed(currentFN); + + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return regionStates.getRegionState(region).isFailedOpen(); + } + }); + + assertTrue("Region: " + region + " should be RIT", + regionStates.getRegionState(region).isFailedOpen()); + + // Regenerate FN and assign, everything else should be fine + List serversForNewFN = Lists.newArrayList(); + for (ServerName sn : admin.getClusterStatus().getServers()) { + serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE)); + } + + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf); + helper.initialize(); + + for (RegionState regionState : regionStates.getRegionsInTransition()) { + HRegionInfo regionInfo = regionState.getRegion(); + List newFavoredNodes = helper.generateFavoredNodes(regionInfo); + assertNotNull(newFavoredNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size()); + LOG.info("Region: " + regionInfo.getEncodedName() + " FN: " + newFavoredNodes); + + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(regionInfo, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + LOG.info("Assigning region: " + regionInfo.getEncodedName()); + admin.assign(regionInfo.getEncodedNameAsBytes()); + } + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size()); + + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + + checkFavoredNodeAssignments(tableName, fnm, regionStates); + } + + @Test + public void testAllFavoredNodesDeadMasterRestarted() throws Exception { + + TableName tableName = TableName.valueOf("testAllFavoredNodesDeadMasterRestarted"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + + final HRegionInfo region = admin.getTableRegions(tableName).get(0); + LOG.info("Region that's supposed to be in transition: " + region); + FavoredNodesManager fnm = master.getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(region); + assertNotNull(currentFN); + + // Lets kill all the RS that are favored nodes for this region. + stopServersAndWaitUntilProcessed(currentFN); + + RegionStates regionStatesBeforeMaster = master.getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return regionStatesBeforeMaster.getRegionState(region).isFailedOpen(); + } + }); + + assertTrue("Region: " + region + " should be RIT", + regionStatesBeforeMaster.getRegionState(region).isFailedOpen()); + + List rit = Lists.newArrayList(); + for (RegionState regionState : regionStatesBeforeMaster.getRegionsInTransition()) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.debug("Region in transition after stopping FN's: " + regionInfo); + rit.add(regionInfo); + assertTrue("Region: " + regionInfo + " should be RIT", + regionStatesBeforeMaster.getRegionState(regionInfo).isFailedOpen()); + assertEquals("Region: " + regionInfo + " does not belong to table: " + tableName, + tableName, regionInfo.getTable()); + } + + Configuration conf = cluster.getConf(); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, + SLAVES - FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); + + cluster.stopMaster(master.getServerName()); + cluster.waitForMasterToStop(master.getServerName(), 60000); + + cluster.startMaster(); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + fnm = master.getFavoredNodesManager(); + + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + assertTrue("Region: " + region + " should be RIT", + regionStates.getRegionState(region).isFailedOpen()); + + for (HRegionInfo regionInfo : rit) { + assertTrue("Region: " + regionInfo + " should be RIT", + regionStates.getRegionState(regionInfo).isFailedOpen()); + } + + // Regenerate FN and assign, everything else should be fine + List serversForNewFN = Lists.newArrayList(); + for (ServerName sn : admin.getClusterStatus().getServers()) { + serversForNewFN.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE)); + } + + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf); + helper.initialize(); + + for (HRegionInfo regionInfo : rit) { + List newFavoredNodes = helper.generateFavoredNodes(regionInfo); + assertNotNull(newFavoredNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size()); + LOG.info("Region: " + regionInfo.getEncodedName() + " FN: " + newFavoredNodes); + + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(regionInfo, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + LOG.info("Assigning region: " + regionInfo.getEncodedName()); + admin.assign(regionInfo.getEncodedNameAsBytes()); + } + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + assertEquals("Not all regions are online", REGION_NUM, admin.getTableRegions(tableName).size()); + + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + + checkFavoredNodeAssignments(tableName, fnm, regionStates); + } + + private void checkFavoredNodeAssignments(TableName tableName, FavoredNodesManager fnm, + RegionStates regionStates) throws IOException { + for (HRegionInfo hri : admin.getTableRegions(tableName)) { + ServerName host = regionStates.getRegionServerOfRegion(hri); + assertNotNull("Region: " + hri.getEncodedName() + " not on FN, current: " + host + + " FN list: " + fnm.getFavoredNodes(hri), + FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(hri), host)); + } + } + + private void stopServersAndWaitUntilProcessed(List currentFN) throws Exception { + for (ServerName sn : currentFN) { + for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + if (ServerName.isSameHostnameAndPort(sn, rst.getRegionServer().getServerName())) { + LOG.info("Shutting down server: " + sn); + cluster.stopRegionServer(rst.getRegionServer().getServerName()); + cluster.waitForRegionServerToStop(rst.getRegionServer().getServerName(), 60000); + } + } + } + + // Wait until dead servers are processed. + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return !master.getServerManager().areDeadServersInProgress(); + } + }); + + assertEquals("Not all servers killed", + SLAVES - currentFN.size(), cluster.getLiveRegionServerThreads().size()); + } + + private void compactTable(TableName tableName) throws IOException { + for(JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { + for(Region region : t.getRegionServer().getOnlineRegions(tableName)) { + region.compact(true); + } + } + } +}