HBASE-13606 AssignmentManager.assign() is not sync in both path

This commit is contained in:
Matteo Bertozzi 2015-05-11 23:42:11 +01:00
parent c3f83a9eff
commit 30ecf990fe
2 changed files with 80 additions and 38 deletions

View File

@ -156,6 +156,7 @@ public class AssignmentManager {
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
@ -194,7 +195,7 @@ public class AssignmentManager {
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
private RegionStateListener regionStateListener;
/**
@ -244,6 +245,8 @@ public class AssignmentManager {
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
this.bulkPerRegionOpenTimeGuesstimate =
conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
this.metricsAssignmentManager = new MetricsAssignmentManager();
this.tableLockManager = tableLockManager;
@ -831,6 +834,18 @@ public class AssignmentManager {
}
}
}
// wait for assignment completion
ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
for (HRegionInfo region: regions) {
if (!region.getTable().isSystemTable()) {
userRegionSet.add(region);
}
}
if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
System.currentTimeMillis())) {
LOG.debug("some user regions are still in transition: " + userRegionSet);
}
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
@ -1349,22 +1364,62 @@ public class AssignmentManager {
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
* @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
while (!regionStates.isRegionOnline(regionInfo)) {
if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
|| this.server.isStopped()) {
return false;
}
ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
regionSet.add(regionInfo);
return waitForAssignment(regionSet, true, Long.MAX_VALUE);
}
// We should receive a notification, but it's
// better to have a timeout to recheck the condition here:
// it lowers the impact of a race condition if any
regionStates.waitForUpdate(100);
/**
* Waits until the specified region has completed assignment, or the deadline is reached.
*/
protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
final boolean waitTillAllAssigned, final int reassigningRegions,
final long minEndTime) throws InterruptedException {
long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
}
/**
* Waits until the specified region has completed assignment, or the deadline is reached.
* @param regionSet set of region to wait on. the set is modified and the assigned regions removed
* @param waitTillAllAssigned true if we should wait all the regions to be assigned
* @param deadline the timestamp after which the wait is aborted
* @return true if all the regions are assigned false otherwise.
* @throws InterruptedException
*/
protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
// We're not synchronizing on regionsInTransition now because we don't use any iterator.
while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
int failedOpenCount = 0;
Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
while (regionInfoIterator.hasNext()) {
HRegionInfo hri = regionInfoIterator.next();
if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
regionInfoIterator.remove();
} else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
failedOpenCount++;
}
}
if (!waitTillAllAssigned) {
// No need to wait, let assignment going on asynchronously
break;
}
if (!regionSet.isEmpty()) {
if (failedOpenCount == regionSet.size()) {
// all the regions we are waiting had an error on open.
break;
}
regionStates.waitForUpdate(100);
}
}
return true;
return regionSet.isEmpty();
}
/**
@ -1453,15 +1508,27 @@ public class AssignmentManager {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
// invoke assignment (async)
ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
if (!region.getTable().isSystemTable()) {
userRegionSet.add(region);
}
}
}
}
}
// wait for assignment completion
if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
System.currentTimeMillis())) {
LOG.debug("some user regions are still in transition: " + userRegionSet);
}
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionState.State;
/**
* Run bulk assign. Does one RCP per regionserver passing a
@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
Configuration conf = server.getConfiguration();
long perRegionOpenTimeGuesstimate =
conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
+ perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
RegionStates regionStates = assignmentManager.getRegionStates();
// We're not synchronizing on regionsInTransition now because we don't use any iterator.
while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
while (regionInfoIterator.hasNext()) {
HRegionInfo hri = regionInfoIterator.next();
if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
regionInfoIterator.remove();
}
}
if (!waitTillAllAssigned) {
// No need to wait, let assignment going on asynchronously
break;
}
if (!regionSet.isEmpty()) {
regionStates.waitForUpdate(100);
}
}
assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;