diff --git a/CHANGES.txt b/CHANGES.txt index 8aaa6b8e2bd..c7457b10ce4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -530,6 +530,8 @@ Release 0.21.0 - Unreleased HBASE-3015 recovered.edits files not deleted if it only contain edits that have already been flushed; hurts perf for all future opens of the region + HBASE-3018 Bulk assignment on startup runs serially through the cluster + servers assigning in bulk to one at a time IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 28276b24b21..d1a061c7ec5 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -118,8 +118,6 @@ public class AssignmentManager extends ZooKeeperListener { private final SortedMap regions = new TreeMap(); - private final ReentrantLock assignLock = new ReentrantLock(); - private final ExecutorService executorService; /** @@ -493,24 +491,18 @@ public class AssignmentManager extends ZooKeeperListener { // Grab the state of this region and synchronize on it String encodedName = region.getEncodedName(); RegionState state; - // This assignLock is used bridging the two synchronization blocks. Once - // we've made it into the 'state' synchronization block, then we can let - // go of this lock. There must be a better construct that this -- St.Ack 20100811 - this.assignLock.lock(); - try { - synchronized (regionsInTransition) { - state = regionsInTransition.get(encodedName); - if(state == null) { - state = new RegionState(region, RegionState.State.OFFLINE); - regionsInTransition.put(encodedName, state); - } + synchronized (regionsInTransition) { + state = regionsInTransition.get(encodedName); + if (state == null) { + state = new RegionState(region, RegionState.State.OFFLINE); + regionsInTransition.put(encodedName, state); } - synchronized(state) { - this.assignLock.unlock(); - assign(state); - } - } finally { - if (this.assignLock.isHeldByCurrentThread()) this.assignLock.unlock(); + } + // This here gap between synchronizations looks like a hole but it should + // be ok because the assign below would protect against being called with + // a state instance that is not in the right 'state' -- St.Ack 20100920. + synchronized (state) { + assign(state); } } @@ -519,9 +511,9 @@ public class AssignmentManager extends ZooKeeperListener { * @param state */ private void assign(final RegionState state) { - if(!state.isClosed() && !state.isOffline()) { + if (!state.isClosed() && !state.isOffline()) { LOG.info("Attempting to assign region but it is in transition and in " + - "an unexpected state:" + state); + "an unexpected state:" + state); return; } else { state.update(RegionState.State.OFFLINE); @@ -675,32 +667,21 @@ public class AssignmentManager extends ZooKeeperListener { // Scan META for all user regions List allRegions = MetaScanner.listAllRegions(master.getConfiguration()); - if (allRegions == null || allRegions.isEmpty()) { - return; - } + if (allRegions == null || allRegions.isEmpty()) return; // Get all available servers List servers = serverManager.getOnlineServersList(); - - LOG.info("Assigning " + allRegions.size() + " regions across " + servers.size() + - " servers"); + LOG.info("Assigning " + allRegions.size() + " region(s) across " + + servers.size() + " server(s)"); // Generate a cluster startup region placement plan - Map> bulkPlan = + Map> bulkPlan = LoadBalancer.bulkAssignment(allRegions, servers); - // For each server, create OFFLINE nodes and send OPEN RPCs - for (Map.Entry> entry : bulkPlan.entrySet()) { - HServerInfo server = entry.getKey(); - List regions = entry.getValue(); - LOG.debug("Assigning " + regions.size() + " regions to " + server); - for (HRegionInfo region : regions) { - LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + server); - String regionName = region.getEncodedName(); - RegionPlan plan = new RegionPlan(region, null,server); - regionPlans.put(regionName, plan); - assign(region); - } + // Now start a thread per server to run assignment. + for (Map.Entry> entry: bulkPlan.entrySet()) { + Thread t = new BulkAssignServer(entry.getKey(), entry.getValue(), this.master); + t.start(); } // Wait for no regions to be in transition @@ -714,6 +695,46 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("All user regions have been assigned"); } + /** + * Class to run bulk assign to a single server. + */ + class BulkAssignServer extends Thread { + private final List regions; + private final HServerInfo server; + private final Stoppable stopper; + + BulkAssignServer(final HServerInfo server, + final List regions, final Stoppable stopper) { + super("serverassign-" + server.getServerName()); + setDaemon(true); + this.server = server; + this.regions = regions; + this.stopper = stopper; + } + + @Override + public void run() { + // Insert a plan for each region with 'server' as the target regionserver. + // Below, we run through regions one at a time. The call to assign will + // move the region into the regionsInTransition which starts up a timer. + // if the region is not out of the regionsInTransition by a certain time, + // it will be reassigned. We don't want that to happen. So, do it this + // way a region at a time for now. Presumably the regionserver will put + // up a back pressure if opening a region takes time which is good since + // this will block our adding new regions to regionsInTransition. Later + // make it so we can send over a lump of regions in one rpc with the + // regionserver on remote side tickling zk on a period to prevent our + // regionsInTransition timing out. Currently its not possible given the + // Executor architecture on the regionserver side. St.Ack 20100920. + for (HRegionInfo region : regions) { + LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + this.server); + regionPlans.put(region.getEncodedName(), new RegionPlan(region, null, server)); + assign(region); + if (this.stopper.isStopped()) break; + } + } + } + private void rebuildUserRegions() throws IOException { Map allRegions = MetaReader.fullScan(catalogTracker); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 654e0914052..e9d77514595 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1908,7 +1908,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void openRegion(HRegionInfo region) { LOG.info("Received request to open region: " + region.getRegionNameAsString()); - if(region.isRootRegion()) { + if (region.isRootRegion()) { this.service.submit(new OpenRootHandler(this, this, region)); } else if(region.isMetaRegion()) { this.service.submit(new OpenMetaHandler(this, this, region));