Addendum HBASE-16209: Add an ExponentialBackOffPolicy so that we spread out the timing of open region retries in AssignmentManager.

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Ashu Pachauri 2016-11-29 15:49:22 -08:00 committed by zhangduo
parent 55645c351e
commit cbdc9fcb8a
1 changed files with 29 additions and 16 deletions

View File

@ -3408,23 +3408,39 @@ public class AssignmentManager extends ZooKeeperListener {
return true; return true;
} }
void invokeAssign(HRegionInfo regionInfo) { void invokeAssignNow(HRegionInfo regionInfo, boolean forceNewPlan) {
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, forceNewPlan));
}
void invokeAssignLater(HRegionInfo regionInfo, boolean forceNewPlan, long sleepMillis) {
scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(new AssignCallable(this,
regionInfo, forceNewPlan)), sleepMillis, TimeUnit.MILLISECONDS);
}
public void invokeAssign(HRegionInfo regionInfo) {
invokeAssign(regionInfo, true); invokeAssign(regionInfo, true);
} }
public void invokeAssign(HRegionInfo regionInfo, boolean newPlan) { public void invokeAssign(HRegionInfo regionInfo, boolean forceNewPlan) {
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan)); if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) {
// Sleep before reassigning if this region has failed to open before
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
getFailedAttempts(regionInfo.getEncodedName()));
invokeAssignLater(regionInfo, forceNewPlan, sleepTime);
} else {
// Immediately reassign if this region has never failed an open before
invokeAssignNow(regionInfo, forceNewPlan);
}
} }
public void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) { private int getFailedAttempts(String regionName) {
scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable( AtomicInteger failedCount = failedOpenTracker.get(regionName);
new AssignCallable(this, regionInfo, true)), sleepMillis, TimeUnit.MILLISECONDS); if (failedCount != null) {
} return failedCount.get();
} else {
public void invokeAssignLaterOnFailure(HRegionInfo regionInfo) { // If we do not have a failed open tracker for a region assume it has never failed before
long sleepTime = backoffPolicy.getBackoffTime(retryConfig, return 0;
failedOpenTracker.get(regionInfo.getEncodedName()).get()); }
invokeAssignLater(regionInfo, sleepTime);
} }
void invokeUnAssign(HRegionInfo regionInfo) { void invokeUnAssign(HRegionInfo regionInfo) {
@ -3738,10 +3754,7 @@ public class AssignmentManager extends ZooKeeperListener {
} catch (HBaseIOException e) { } catch (HBaseIOException e) {
LOG.warn("Failed to get region plan", e); LOG.warn("Failed to get region plan", e);
} }
// Have the current thread sleep a bit before resubmitting the RPC request invokeAssign(hri, false);
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
failedOpenTracker.get(encodedName).get());
invokeAssignLater(hri, sleepTime);
} }
} }
} }