HBASE-2896 Retain assignment information between cluster shutdown/startup
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1032166 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ce7567c23b
commit
ea5395d62b
|
@ -1172,6 +1172,8 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2201 JRuby shell for replication
|
||||
HBASE-2946 Increment multiple columns in a row at once
|
||||
HBASE-3013 Tool to verify data in two clusters
|
||||
HBASE-2896 Retain assignment information between cluster
|
||||
shutdown/startup
|
||||
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -128,6 +130,25 @@ public class MetaReader {
|
|||
*/
|
||||
public static Map<HRegionInfo,HServerAddress> fullScan(
|
||||
CatalogTracker catalogTracker)
|
||||
throws IOException {
|
||||
return fullScan(catalogTracker, new TreeSet<String>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a full scan of <code>.META.</code>, skipping regions from any
|
||||
* tables in the specified set of disabled tables.
|
||||
* <p>
|
||||
* Returns a map of every region to it's currently assigned server, according
|
||||
* to META. If the region does not have an assignment it will have a null
|
||||
* value in the map.
|
||||
*
|
||||
* @param catalogTracker
|
||||
* @param disabledTables set of disabled tables that will not be returned
|
||||
* @return map of regions to their currently assigned server
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Map<HRegionInfo,HServerAddress> fullScan(
|
||||
CatalogTracker catalogTracker, final Set<String> disabledTables)
|
||||
throws IOException {
|
||||
final Map<HRegionInfo,HServerAddress> regions =
|
||||
new TreeMap<HRegionInfo,HServerAddress>();
|
||||
|
@ -137,6 +158,8 @@ public class MetaReader {
|
|||
if (r == null || r.isEmpty()) return true;
|
||||
Pair<HRegionInfo,HServerAddress> region = metaRowToRegionPair(r);
|
||||
if (region == null) return true;
|
||||
if (disabledTables.contains(
|
||||
region.getFirst().getTableDesc().getNameAsString())) return true;
|
||||
regions.put(region.getFirst(), region.getSecond());
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
|
@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.Stoppable;
|
|||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||
|
@ -1106,23 +1106,33 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* should be shutdown.
|
||||
*/
|
||||
public void assignAllUserRegions() throws IOException {
|
||||
// First experiment at synchronous assignment
|
||||
// Simpler because just wait for no regions in transition
|
||||
|
||||
// Scan META for all user regions; do not include offlined regions in list.
|
||||
List<HRegionInfo> allRegions =
|
||||
MetaScanner.listAllRegions(master.getConfiguration(), false);
|
||||
if (allRegions == null || allRegions.isEmpty()) return;
|
||||
Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
|
||||
|
||||
// Get all available servers
|
||||
List<HServerInfo> servers = serverManager.getOnlineServersList();
|
||||
|
||||
// Scan META for all user regions, skipping any disabled tables
|
||||
Map<HRegionInfo,HServerAddress> allRegions =
|
||||
MetaReader.fullScan(catalogTracker, disabledTables);
|
||||
if (allRegions == null || allRegions.isEmpty()) return;
|
||||
|
||||
// Determine what type of assignment to do on startup
|
||||
boolean retainAssignment = master.getConfiguration().getBoolean(
|
||||
"hbase.master.startup.retainassign", true);
|
||||
|
||||
if (retainAssignment) {
|
||||
// Reuse existing assignment info
|
||||
bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
|
||||
} else {
|
||||
// Generate a round-robin bulk assignment plan
|
||||
bulkPlan = LoadBalancer.roundRobinAssignment(
|
||||
new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
|
||||
}
|
||||
|
||||
LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
|
||||
servers.size() + " server(s)");
|
||||
|
||||
// Generate a cluster startup region placement plan
|
||||
Map<HServerInfo, List<HRegionInfo>> bulkPlan =
|
||||
LoadBalancer.bulkAssignment(allRegions, servers);
|
||||
|
||||
// Make a fixed thread count pool to run bulk assignments. Thought is that
|
||||
// if a 1k cluster, running 1k bulk concurrent assignment threads will kill
|
||||
// master, HDFS or ZK?
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
|
||||
/**
|
||||
|
@ -337,11 +338,12 @@ public class LoadBalancer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Generates a bulk assignment plan to be used on cluster startup.
|
||||
*
|
||||
* Generates a bulk assignment plan to be used on cluster startup using a
|
||||
* simple round-robin assignment.
|
||||
* <p>
|
||||
* Takes a list of all the regions and all the servers in the cluster and
|
||||
* returns a map of each server to the regions that it should be assigned.
|
||||
*
|
||||
* <p>
|
||||
* Currently implemented as a round-robin assignment. Same invariant as
|
||||
* load balancing, all servers holding floor(avg) or ceiling(avg).
|
||||
*
|
||||
|
@ -352,7 +354,7 @@ public class LoadBalancer {
|
|||
* @return map of server to the regions it should take, or null if no
|
||||
* assignment is possible (ie. no regions or no servers)
|
||||
*/
|
||||
public static Map<HServerInfo,List<HRegionInfo>> bulkAssignment(
|
||||
public static Map<HServerInfo,List<HRegionInfo>> roundRobinAssignment(
|
||||
List<HRegionInfo> regions, List<HServerInfo> servers) {
|
||||
if(regions.size() == 0 || servers.size() == 0) {
|
||||
return null;
|
||||
|
@ -374,6 +376,45 @@ public class LoadBalancer {
|
|||
return assignments;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a bulk assignment startup plan, attempting to reuse the existing
|
||||
* assignment information from META, but adjusting for the specified list of
|
||||
* available/online servers available for assignment.
|
||||
* <p>
|
||||
* Takes a map of all regions to their existing assignment from META. Also
|
||||
* takes a list of online servers for regions to be assigned to. Attempts to
|
||||
* retain all assignment, so in some instances initial assignment will not be
|
||||
* completely balanced.
|
||||
* <p>
|
||||
* Any leftover regions without an existing server to be assigned to will be
|
||||
* assigned randomly to available servers.
|
||||
* @param regions regions and existing assignment from meta
|
||||
* @param servers available servers
|
||||
* @return map of servers and regions to be assigned to them
|
||||
*/
|
||||
public static Map<HServerInfo, List<HRegionInfo>> retainAssignment(
|
||||
Map<HRegionInfo, HServerAddress> regions, List<HServerInfo> servers) {
|
||||
Map<HServerInfo, List<HRegionInfo>> assignments =
|
||||
new TreeMap<HServerInfo, List<HRegionInfo>>();
|
||||
// Build a map of server addresses to server info so we can match things up
|
||||
Map<HServerAddress, HServerInfo> serverMap =
|
||||
new TreeMap<HServerAddress, HServerInfo>();
|
||||
for (HServerInfo server : servers) {
|
||||
serverMap.put(server.getServerAddress(), server);
|
||||
assignments.put(server, new ArrayList<HRegionInfo>());
|
||||
}
|
||||
for (Map.Entry<HRegionInfo, HServerAddress> region : regions.entrySet()) {
|
||||
HServerInfo server = serverMap.get(region.getValue());
|
||||
if (server != null) {
|
||||
assignments.get(server).add(region.getKey());
|
||||
} else {
|
||||
assignments.get(servers.get(rand.nextInt(assignments.size()))).add(
|
||||
region.getKey());
|
||||
}
|
||||
}
|
||||
return assignments;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the block locations for all of the files for the specified region.
|
||||
*
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -28,11 +29,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -239,7 +240,7 @@ public class TestLoadBalancer {
|
|||
List<HRegionInfo> regions = randomRegions(mock[0]);
|
||||
List<HServerInfo> servers = randomServers(mock[1], 0);
|
||||
Map<HServerInfo,List<HRegionInfo>> assignments =
|
||||
LoadBalancer.bulkAssignment(regions, servers);
|
||||
LoadBalancer.roundRobinAssignment(regions, servers);
|
||||
float average = (float)regions.size()/servers.size();
|
||||
int min = (int)Math.floor(average);
|
||||
int max = (int)Math.ceil(average);
|
||||
|
@ -253,6 +254,80 @@ public class TestLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the cluster startup bulk assignment which attempts to retain
|
||||
* assignment info.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRetainAssignment() throws Exception {
|
||||
// Test simple case where all same servers are there
|
||||
List<HServerInfo> servers = randomServers(10, 10);
|
||||
List<HRegionInfo> regions = randomRegions(100);
|
||||
Map<HRegionInfo, HServerAddress> existing =
|
||||
new TreeMap<HRegionInfo, HServerAddress>();
|
||||
for (int i=0;i<regions.size();i++) {
|
||||
existing.put(regions.get(i),
|
||||
servers.get(i % servers.size()).getServerAddress());
|
||||
}
|
||||
Map<HServerInfo, List<HRegionInfo>> assignment =
|
||||
LoadBalancer.retainAssignment(existing, servers);
|
||||
assertRetainedAssignment(existing, servers, assignment);
|
||||
|
||||
// Include two new servers that were not there before
|
||||
List<HServerInfo> servers2 = new ArrayList<HServerInfo>(servers);
|
||||
servers2.add(randomServer(10));
|
||||
servers2.add(randomServer(10));
|
||||
assignment = LoadBalancer.retainAssignment(existing, servers2);
|
||||
assertRetainedAssignment(existing, servers2, assignment);
|
||||
|
||||
// Remove two of the servers that were previously there
|
||||
List<HServerInfo> servers3 = new ArrayList<HServerInfo>(servers);
|
||||
servers3.remove(servers3.size()-1);
|
||||
servers3.remove(servers3.size()-2);
|
||||
assignment = LoadBalancer.retainAssignment(existing, servers3);
|
||||
assertRetainedAssignment(existing, servers3, assignment);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts a valid retained assignment plan.
|
||||
* <p>
|
||||
* Must meet the following conditions:
|
||||
* <ul>
|
||||
* <li>Every input region has an assignment, and to an online server
|
||||
* <li>If a region had an existing assignment to a server with the same
|
||||
* address a a currently online server, it will be assigned to it
|
||||
* </ul>
|
||||
* @param existing
|
||||
* @param servers
|
||||
* @param assignment
|
||||
*/
|
||||
private void assertRetainedAssignment(
|
||||
Map<HRegionInfo, HServerAddress> existing, List<HServerInfo> servers,
|
||||
Map<HServerInfo, List<HRegionInfo>> assignment) {
|
||||
// Verify condition 1, every region assigned, and to online server
|
||||
Set<HServerInfo> onlineServerSet = new TreeSet<HServerInfo>(servers);
|
||||
Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
|
||||
for (Map.Entry<HServerInfo, List<HRegionInfo>> a : assignment.entrySet()) {
|
||||
assertTrue("Region assigned to server that was not listed as online",
|
||||
onlineServerSet.contains(a.getKey()));
|
||||
for (HRegionInfo r : a.getValue()) assignedRegions.add(r);
|
||||
}
|
||||
assertEquals(existing.size(), assignedRegions.size());
|
||||
|
||||
// Verify condition 2, if server had existing assignment, must have same
|
||||
Set<HServerAddress> onlineAddresses = new TreeSet<HServerAddress>();
|
||||
for (HServerInfo s : servers) onlineAddresses.add(s.getServerAddress());
|
||||
for (Map.Entry<HServerInfo, List<HRegionInfo>> a : assignment.entrySet()) {
|
||||
for (HRegionInfo r : a.getValue()) {
|
||||
HServerAddress address = existing.get(r);
|
||||
if (address != null && onlineAddresses.contains(address)) {
|
||||
assertTrue(a.getKey().getServerAddress().equals(address));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String printStats(Map<HServerInfo, List<HRegionInfo>> servers) {
|
||||
int numServers = servers.size();
|
||||
int totalRegions = 0;
|
||||
|
|
Loading…
Reference in New Issue