HBASE-7741 Don't use bulk assigner if assigning just several regions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1443617 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-02-07 17:31:47 +00:00
parent b198a50434
commit ce4595ad67
2 changed files with 57 additions and 13 deletions

View File

@ -157,6 +157,18 @@ public class AssignmentManager extends ZooKeeperListener {
private final RegionStates regionStates; private final RegionStates regionStates;
// The threshold to use bulk assigning. Using bulk assignment
// only if assigning at least this many regions to at least this
// many servers. If assigning fewer regions to fewer servers,
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
// performance, but not needed in most use cases.
private final boolean bulkAssignWaitTillAllAssigned;
/** /**
* Indicator that AssignmentManager has recovered the region states so * Indicator that AssignmentManager has recovered the region states so
* that ServerShutdownHandler can be fully enabled and re-assign regions * that ServerShutdownHandler can be fully enabled and re-assign regions
@ -206,6 +218,11 @@ public class AssignmentManager extends ZooKeeperListener {
this.metricsMaster = metricsMaster;// can be null only with tests. this.metricsMaster = metricsMaster;// can be null only with tests.
this.regionStates = new RegionStates(server, serverManager); this.regionStates = new RegionStates(server, serverManager);
this.bulkAssignWaitTillAllAssigned =
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker"); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
@ -2115,11 +2132,8 @@ public class AssignmentManager extends ZooKeeperListener {
Map<ServerName, List<HRegionInfo>> bulkPlan = Map<ServerName, List<HRegionInfo>> bulkPlan =
balancer.retainAssignment(regions, servers); balancer.retainAssignment(regions, servers);
LOG.info("Bulk assigning " + regions.size() + " region(s) across " + assign(regions.size(), servers.size(),
servers.size() + " server(s), retainAssignment=true"); "retainAssignment=true", bulkPlan);
BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
ba.bulkAssign();
LOG.info("Bulk assigning done");
} }
/** /**
@ -2135,6 +2149,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (regions == null || regions.isEmpty()) { if (regions == null || regions.isEmpty()) {
return; return;
} }
List<ServerName> servers = serverManager.createDestinationServersList(); List<ServerName> servers = serverManager.createDestinationServersList();
if (servers == null || servers.isEmpty()) { if (servers == null || servers.isEmpty()) {
throw new IOException("Found no destination server to assign region(s)"); throw new IOException("Found no destination server to assign region(s)");
@ -2144,13 +2159,36 @@ public class AssignmentManager extends ZooKeeperListener {
Map<ServerName, List<HRegionInfo>> bulkPlan Map<ServerName, List<HRegionInfo>> bulkPlan
= balancer.roundRobinAssignment(regions, servers); = balancer.roundRobinAssignment(regions, servers);
LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " assign(regions.size(), servers.size(),
+ servers.size() + " server(s)"); "round-robin=true", bulkPlan);
}
// Use fixed count thread pool assigning. private void assign(int regions, int totalServers,
BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this); String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
ba.bulkAssign(); throws InterruptedException, IOException {
LOG.info("Bulk assigning done");
int servers = bulkPlan.size();
if (servers == 1 || (regions < bulkAssignThresholdRegions
&& servers < bulkAssignThresholdServers)) {
// Not use bulk assignment. This could be more efficient in small
// cluster, especially mini cluster for testing, so that tests won't time out
LOG.info("Not use bulk assigning since we are assigning only "
+ regions + " region(s) to " + servers + " server(s)");
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
assign(plan.getKey(), plan.getValue());
}
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
// Use fixed count thread pool assigning.
BulkAssigner ba = new GeneralBulkAssigner(
this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
ba.bulkAssign();
LOG.info("Bulk assigning done");
}
} }
/** /**

View File

@ -50,13 +50,15 @@ public class GeneralBulkAssigner extends BulkAssigner {
final Map<ServerName, List<HRegionInfo>> bulkPlan; final Map<ServerName, List<HRegionInfo>> bulkPlan;
final AssignmentManager assignmentManager; final AssignmentManager assignmentManager;
final boolean waitTillAllAssigned;
GeneralBulkAssigner(final Server server, GeneralBulkAssigner(final Server server,
final Map<ServerName, List<HRegionInfo>> bulkPlan, final Map<ServerName, List<HRegionInfo>> bulkPlan,
final AssignmentManager am) { final AssignmentManager am, final boolean waitTillAllAssigned) {
super(server); super(server);
this.bulkPlan = bulkPlan; this.bulkPlan = bulkPlan;
this.assignmentManager = am; this.assignmentManager = am;
this.waitTillAllAssigned = waitTillAllAssigned;
} }
@Override @Override
@ -133,6 +135,10 @@ public class GeneralBulkAssigner extends BulkAssigner {
regionInfoIterator.remove(); regionInfoIterator.remove();
} }
} }
if (!waitTillAllAssigned) {
// No need to wait, let assignment going on asynchronously
break;
}
if (!regionSet.isEmpty()) { if (!regionSet.isEmpty()) {
regionStates.waitForUpdate(100); regionStates.waitForUpdate(100);
} }
@ -142,7 +148,7 @@ public class GeneralBulkAssigner extends BulkAssigner {
long elapsedTime = System.currentTimeMillis() - startTime; long elapsedTime = System.currentTimeMillis() - startTime;
String status = "successfully"; String status = "successfully";
if (!regionSet.isEmpty()) { if (!regionSet.isEmpty()) {
status = "with " + regionSet.size() + " regions still not assigned yet"; status = "with " + regionSet.size() + " regions still in transition";
} }
LOG.debug("bulk assigning total " + regionCount + " regions to " LOG.debug("bulk assigning total " + regionCount + " regions to "
+ serverCount + " servers, took " + elapsedTime + "ms, " + status); + serverCount + " servers, took " + elapsedTime + "ms, " + status);