From 30ecf990fe2a343e418eedcffd1d8d5c94ab1fd3 Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Mon, 11 May 2015 23:42:11 +0100 Subject: [PATCH] HBASE-13606 AssignmentManager.assign() is not sync in both path --- .../hbase/master/AssignmentManager.java | 89 ++++++++++++++++--- .../hbase/master/GeneralBulkAssigner.java | 29 +----- 2 files changed, 80 insertions(+), 38 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 4a1e71fcb3b..eae99991474 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -156,6 +156,7 @@ public class AssignmentManager { // bulk assigning may be not as efficient. private final int bulkAssignThresholdRegions; private final int bulkAssignThresholdServers; + private final int bulkPerRegionOpenTimeGuesstimate; // Should bulk assignment wait till all regions are assigned, // or it is timed out? This is useful to measure bulk assignment @@ -194,7 +195,7 @@ public class AssignmentManager { /** Listeners that are called on assignment events. */ private List listeners = new CopyOnWriteArrayList(); - + private RegionStateListener regionStateListener; /** @@ -244,6 +245,8 @@ public class AssignmentManager { conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); + this.bulkPerRegionOpenTimeGuesstimate = + conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); this.metricsAssignmentManager = new MetricsAssignmentManager(); this.tableLockManager = tableLockManager; @@ -831,6 +834,18 @@ public class AssignmentManager { } } } + + // wait for assignment completion + ArrayList userRegionSet = new ArrayList(regions.size()); + for (HRegionInfo region: regions) { + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } + } + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } LOG.debug("Bulk assigning done for " + destination); return true; } finally { @@ -1349,22 +1364,62 @@ public class AssignmentManager { * If the region is already assigned, returns immediately. Otherwise, method * blocks until the region is assigned. * @param regionInfo region to wait on assignment for + * @return true if the region is assigned false otherwise. * @throws InterruptedException */ public boolean waitForAssignment(HRegionInfo regionInfo) throws InterruptedException { - while (!regionStates.isRegionOnline(regionInfo)) { - if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN) - || this.server.isStopped()) { - return false; - } + ArrayList regionSet = new ArrayList(1); + regionSet.add(regionInfo); + return waitForAssignment(regionSet, true, Long.MAX_VALUE); + } - // We should receive a notification, but it's - // better to have a timeout to recheck the condition here: - // it lowers the impact of a race condition if any - regionStates.waitForUpdate(100); + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final int reassigningRegions, + final long minEndTime) throws InterruptedException { + long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); + return waitForAssignment(regionSet, waitTillAllAssigned, deadline); + } + + /** + * Waits until the specified region has completed assignment, or the deadline is reached. + * @param regionSet set of region to wait on. the set is modified and the assigned regions removed + * @param waitTillAllAssigned true if we should wait all the regions to be assigned + * @param deadline the timestamp after which the wait is aborted + * @return true if all the regions are assigned false otherwise. + * @throws InterruptedException + */ + protected boolean waitForAssignment(final Collection regionSet, + final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { + // We're not synchronizing on regionsInTransition now because we don't use any iterator. + while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { + int failedOpenCount = 0; + Iterator regionInfoIterator = regionSet.iterator(); + while (regionInfoIterator.hasNext()) { + HRegionInfo hri = regionInfoIterator.next(); + if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, + State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { + regionInfoIterator.remove(); + } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { + failedOpenCount++; + } + } + if (!waitTillAllAssigned) { + // No need to wait, let assignment going on asynchronously + break; + } + if (!regionSet.isEmpty()) { + if (failedOpenCount == regionSet.size()) { + // all the regions we are waiting had an error on open. + break; + } + regionStates.waitForUpdate(100); + } } - return true; + return regionSet.isEmpty(); } /** @@ -1453,15 +1508,27 @@ public class AssignmentManager { LOG.trace("Not using bulk assignment since we are assigning only " + regions + " region(s) to " + servers + " server(s)"); } + + // invoke assignment (async) + ArrayList userRegionSet = new ArrayList(regions); for (Map.Entry> plan: bulkPlan.entrySet()) { if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) { for (HRegionInfo region: plan.getValue()) { if (!regionStates.isRegionOnline(region)) { invokeAssign(region); + if (!region.getTable().isSystemTable()) { + userRegionSet.add(region); + } } } } } + + // wait for assignment completion + if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), + System.currentTimeMillis())) { + LOG.debug("some user regions are still in transition: " + userRegionSet); + } } else { LOG.info("Bulk assigning " + regions + " region(s) across " + totalServers + " server(s), " + message); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java index 356f4afeeb0..43ea52345cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.RegionState.State; /** * Run bulk assign. Does one RCP per regionserver passing a @@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner { if (!failedPlans.isEmpty() && !server.isStopped()) { reassigningRegions = reassignFailedPlans(); } - - Configuration conf = server.getConfiguration(); - long perRegionOpenTimeGuesstimate = - conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); - long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime) - + perRegionOpenTimeGuesstimate * (reassigningRegions + 1); - RegionStates regionStates = assignmentManager.getRegionStates(); - // We're not synchronizing on regionsInTransition now because we don't use any iterator. - while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) { - Iterator regionInfoIterator = regionSet.iterator(); - while (regionInfoIterator.hasNext()) { - HRegionInfo hri = regionInfoIterator.next(); - if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, - State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { - regionInfoIterator.remove(); - } - } - if (!waitTillAllAssigned) { - // No need to wait, let assignment going on asynchronously - break; - } - if (!regionSet.isEmpty()) { - regionStates.waitForUpdate(100); - } - } + assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned, + reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime)); if (LOG.isDebugEnabled()) { long elapsedTime = System.currentTimeMillis() - startTime;