diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index bf9b207daba..34c49634ff6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -214,6 +214,7 @@ public class AssignmentManager extends ZooKeeperListener { // bulk assigning may be not as efficient. private final int bulkAssignThresholdRegions; private final int bulkAssignThresholdServers; + private final int bulkPerRegionOpenTimeGuesstimate; // Should bulk assignment wait till all regions are assigned, // or it is timed out? This is useful to measure bulk assignment @@ -255,7 +256,7 @@ public class AssignmentManager extends ZooKeeperListener { /** Listeners that are called on assignment events. */ private List listeners = new CopyOnWriteArrayList(); - + private RegionStateListener regionStateListener; /** @@ -312,6 +313,8 @@ public class AssignmentManager extends ZooKeeperListener { conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); + this.bulkPerRegionOpenTimeGuesstimate = + conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); @@ -1096,7 +1099,7 @@ public class AssignmentManager extends ZooKeeperListener { return; } // Handle OPENED by removing from transition and deleted zk node - regionState = + regionState = regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn); if (regionState != null) { failedOpenTracker.remove(encodedName); // reset the count, if any @@ -1788,6 +1791,18 @@ public class AssignmentManager extends ZooKeeperListener { } } } + + // wait for assignment completion + ArrayList userRegionSet = new ArrayList(regions.size()); + for (HRegionInfo region: regions) { + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } + } + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } LOG.debug("Bulk assigning done for " + destination); return true; } finally { @@ -2617,22 +2632,62 @@ public class AssignmentManager extends ZooKeeperListener { * If the region is already assigned, returns immediately. Otherwise, method * blocks until the region is assigned. * @param regionInfo region to wait on assignment for + * @return true if the region is assigned false otherwise. * @throws InterruptedException */ public boolean waitForAssignment(HRegionInfo regionInfo) throws InterruptedException { - while (!regionStates.isRegionOnline(regionInfo)) { - if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN) - || this.server.isStopped()) { - return false; - } + ArrayList regionSet = new ArrayList(1); + regionSet.add(regionInfo); + return waitForAssignment(regionSet, true, Long.MAX_VALUE); + } - // We should receive a notification, but it's - // better to have a timeout to recheck the condition here: - // it lowers the impact of a race condition if any - regionStates.waitForUpdate(100); + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final int reassigningRegions, + final long minEndTime) throws InterruptedException { + long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); + return waitForAssignment(regionSet, waitTillAllAssigned, deadline); + } + + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + * @param regionSet set of region to wait on. the set is modified and the assigned regions removed + * @param waitTillAllAssigned true if we should wait all the regions to be assigned + * @param deadline the timestamp after which the wait is aborted + * @return true if all the regions are assigned false otherwise. + * @throws InterruptedException + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { + // We're not synchronizing on regionsInTransition now because we don't use any iterator. + while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { + int failedOpenCount = 0; + Iterator regionInfoIterator = regionSet.iterator(); + while (regionInfoIterator.hasNext()) { + HRegionInfo hri = regionInfoIterator.next(); + if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, + State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { + regionInfoIterator.remove(); + } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { + failedOpenCount++; + } + } + if (!waitTillAllAssigned) { + // No need to wait, let assignment going on asynchronously + break; + } + if (!regionSet.isEmpty()) { + if (failedOpenCount == regionSet.size()) { + // all the regions we are waiting had an error on open. + break; + } + regionStates.waitForUpdate(100); + } } - return true; + return regionSet.isEmpty(); } /** @@ -2725,15 +2780,27 @@ public class AssignmentManager extends ZooKeeperListener { LOG.trace("Not using bulk assignment since we are assigning only " + regions + " region(s) to " + servers + " server(s)"); } + + // invoke assignment (async) + ArrayList userRegionSet = new ArrayList(regions); for (Map.Entry> plan: bulkPlan.entrySet()) { if (!assign(plan.getKey(), plan.getValue())) { for (HRegionInfo region: plan.getValue()) { if (!regionStates.isRegionOnline(region)) { invokeAssign(region); + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } } } } } + + // wait for assignment completion + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } } else { LOG.info("Bulk assigning " + regions + " region(s) across " + totalServers + " server(s), " + message); @@ -3044,11 +3111,11 @@ public class AssignmentManager extends ZooKeeperListener { if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) { LOG.info("Server " + serverName + " isn't online. SSH will handle this"); - continue; + continue; } HRegionInfo regionInfo = regionState.getRegion(); State state = regionState.getState(); - + switch (state) { case CLOSED: invokeAssign(regionInfo); @@ -3060,7 +3127,7 @@ public class AssignmentManager extends ZooKeeperListener { retrySendRegionClose(regionState); break; case FAILED_CLOSE: - case FAILED_OPEN: + case FAILED_OPEN: invokeUnAssign(regionInfo); break; default: @@ -4270,7 +4337,7 @@ public class AssignmentManager extends ZooKeeperListener { getSnapShotOfAssignment(Collection infos) { return getRegionStates().getRegionAssignments(infos); } - + void setRegionStateListener(RegionStateListener listener) { this.regionStateListener = listener; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java index 356f4afeeb0..43ea52345cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.RegionState.State; /** * Run bulk assign. Does one RCP per regionserver passing a @@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner { if (!failedPlans.isEmpty() && !server.isStopped()) { reassigningRegions = reassignFailedPlans(); } - - Configuration conf = server.getConfiguration(); - long perRegionOpenTimeGuesstimate = - conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); - long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime) - + perRegionOpenTimeGuesstimate * (reassigningRegions + 1); - RegionStates regionStates = assignmentManager.getRegionStates(); - // We're not synchronizing on regionsInTransition now because we don't use any iterator. - while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) { - Iterator regionInfoIterator = regionSet.iterator(); - while (regionInfoIterator.hasNext()) { - HRegionInfo hri = regionInfoIterator.next(); - if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, - State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { - regionInfoIterator.remove(); - } - } - if (!waitTillAllAssigned) { - // No need to wait, let assignment going on asynchronously - break; - } - if (!regionSet.isEmpty()) { - regionStates.waitForUpdate(100); - } - } + assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned, + reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime)); if (LOG.isDebugEnabled()) { long elapsedTime = System.currentTimeMillis() - startTime;