Back port HBASE-16209 Provide an exponential back off policy in reattempting failed region opens
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
08b9e6bee0
commit
7c97acf6e3
|
@ -30,6 +30,8 @@ java.util.ArrayList;
|
|||
java.util.Set;
|
||||
java.util.HashSet;
|
||||
java.lang.Integer;
|
||||
java.util.Map;
|
||||
java.util.concurrent.atomic.AtomicInteger;
|
||||
</%import>
|
||||
<%args>
|
||||
AssignmentManager assignmentManager;
|
||||
|
@ -37,7 +39,9 @@ int limit = Integer.MAX_VALUE;
|
|||
</%args>
|
||||
|
||||
<%java SortedSet<RegionState> rit = assignmentManager
|
||||
.getRegionStates().getRegionsInTransitionOrderedByTimestamp(); %>
|
||||
.getRegionStates().getRegionsInTransitionOrderedByTimestamp();
|
||||
Map<String, AtomicInteger> failedRegionTracker = assignmentManager.getFailedOpenTracker();
|
||||
%>
|
||||
|
||||
<%if !rit.isEmpty() %>
|
||||
<%java>
|
||||
|
@ -92,7 +96,7 @@ int numOfPages = (int) Math.ceil(numOfRITs * 1.0 / ritsPerPage);
|
|||
<div class="tab-pane" id="tab_rits<% (recordItr / ritsPerPage) + 1 %>">
|
||||
</%if>
|
||||
<table class="table table-striped" style="margin-bottom:0px;"><tr><th>Region</th>
|
||||
<th>State</th><th>RIT time (ms)</th></tr>
|
||||
<th>State</th><th>RIT time (ms)</th><th>Retries </th></tr>
|
||||
</%if>
|
||||
|
||||
<%if ritsOverThreshold.contains(rs.getRegion().getEncodedName()) %>
|
||||
|
@ -102,9 +106,21 @@ int numOfPages = (int) Math.ceil(numOfRITs * 1.0 / ritsPerPage);
|
|||
<%else>
|
||||
<tr>
|
||||
</%if>
|
||||
<%java>
|
||||
String retryStatus = "0";
|
||||
String name = rs.getRegion().getEncodedName();
|
||||
RegionState state = assignmentManager.getState(name);
|
||||
AtomicInteger numOpenRetries = failedRegionTracker.get(name);
|
||||
if (numOpenRetries != null ) {
|
||||
retryStatus = Integer.toString(numOpenRetries.get());
|
||||
} else if (state.getState() == RegionState.State.FAILED_OPEN) {
|
||||
retryStatus = "Failed";
|
||||
}
|
||||
</%java>
|
||||
<td><% rs.getRegion().getEncodedName() %></td><td>
|
||||
<% rs.toDescriptiveString() %></td>
|
||||
<td><% (currentTime - rs.getStamp()) %> </td>
|
||||
<td><% retryStatus %> </td>
|
||||
<%java recordItr++; %>
|
||||
<%if (recordItr % ritsPerPage) == 0 %>
|
||||
</table>
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
|
@ -200,6 +202,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
//Thread pool executor service for timeout monitor
|
||||
private java.util.concurrent.ExecutorService threadPoolExecutorService;
|
||||
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
|
||||
|
||||
// A bunch of ZK events workers. Each is a single thread executor service
|
||||
private final java.util.concurrent.ExecutorService zkEventWorkers;
|
||||
|
@ -264,6 +267,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
|
||||
}
|
||||
|
||||
private RetryCounter.BackoffPolicy backoffPolicy;
|
||||
private RetryCounter.RetryConfig retryConfig;
|
||||
/**
|
||||
* Constructs a new assignment manager.
|
||||
*
|
||||
|
@ -309,8 +314,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
"hbase.meta.assignment.retry.sleeptime", 1000l);
|
||||
this.balancer = balancer;
|
||||
int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
|
||||
|
||||
this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
|
||||
maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
|
||||
maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
|
||||
|
||||
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
|
||||
Threads.newDaemonThreadFactory("AM.Scheduler"));
|
||||
|
||||
this.regionStates = new RegionStates(
|
||||
server, tableStateManager, serverManager, regionStateStore);
|
||||
|
||||
|
@ -329,6 +339,22 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
this.metricsAssignmentManager = new MetricsAssignmentManager();
|
||||
useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
|
||||
// Configurations for retrying opening a region on receiving a FAILED_OPEN
|
||||
this.retryConfig = new RetryCounter.RetryConfig();
|
||||
this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
|
||||
// Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
|
||||
// if the user does not set a max sleep time
|
||||
this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
|
||||
retryConfig.getSleepInterval()));
|
||||
this.backoffPolicy = getBackoffPolicy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the backoff policy used for Failed Region Open retries
|
||||
* @return the backoff policy used for Failed Region Open retries
|
||||
*/
|
||||
RetryCounter.BackoffPolicy getBackoffPolicy() {
|
||||
return new RetryCounter.ExponentialBackoffPolicyWithLimit();
|
||||
}
|
||||
|
||||
MetricsAssignmentManager getAssignmentManagerMetrics() {
|
||||
|
@ -3390,6 +3416,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
|
||||
}
|
||||
|
||||
public void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
|
||||
scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
|
||||
new AssignCallable(this, regionInfo, true)), sleepMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void invokeAssignLaterOnFailure(HRegionInfo regionInfo) {
|
||||
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
|
||||
failedOpenTracker.get(regionInfo.getEncodedName()).get());
|
||||
invokeAssignLater(regionInfo, sleepTime);
|
||||
}
|
||||
|
||||
void invokeUnAssign(HRegionInfo regionInfo) {
|
||||
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
|
||||
}
|
||||
|
@ -3701,7 +3738,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
} catch (HBaseIOException e) {
|
||||
LOG.warn("Failed to get region plan", e);
|
||||
}
|
||||
invokeAssign(hri, false);
|
||||
// Have the current thread sleep a bit before resubmitting the RPC request
|
||||
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
|
||||
failedOpenTracker.get(encodedName).get());
|
||||
invokeAssignLater(hri, sleepTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4233,6 +4273,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return replicasToClose;
|
||||
}
|
||||
|
||||
public Map<String, AtomicInteger> getFailedOpenTracker() {return failedOpenTracker;}
|
||||
|
||||
public RegionState getState(String encodedName) {return regionStates.getRegionState(encodedName);}
|
||||
|
||||
/**
|
||||
* A region is offline. The new state should be the specified one,
|
||||
* if not null. If the specified state is null, the new state is Offline.
|
||||
|
@ -4454,4 +4498,16 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
void setRegionStateListener(RegionStateListener listener) {
|
||||
this.regionStateListener = listener;
|
||||
}
|
||||
|
||||
private class DelayedAssignCallable implements Runnable {
|
||||
Callable callable;
|
||||
public DelayedAssignCallable(Callable callable) {
|
||||
this.callable = callable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
threadPoolExecutorService.submit(callable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class RegionStates {
|
|||
private static final Log LOG = LogFactory.getLog(RegionStates.class);
|
||||
|
||||
public final static RegionStateStampComparator REGION_STATE_COMPARATOR =
|
||||
new RegionStateStampComparator();
|
||||
new RegionStateStampComparator();
|
||||
|
||||
// This comparator sorts the RegionStates by time stamp then Region name.
|
||||
// Comparing by timestamp alone can lead us to discard different RegionStates that happen
|
||||
|
|
|
@ -103,6 +103,6 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
regionInfo, RegionState.State.CLOSED);
|
||||
// This below has to do w/ online enable/disable of a table
|
||||
assignmentManager.removeClosedRegion(regionInfo);
|
||||
assignmentManager.assign(regionInfo, true);
|
||||
assignmentManager.invokeAssignLaterOnFailure(regionInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
|
@ -172,6 +173,7 @@ public class TestMasterStatusServlet {
|
|||
Mockito.doReturn(rs).when(am).getRegionStates();
|
||||
Mockito.doReturn(regionsInTransition).when(rs).getRegionsInTransition();
|
||||
Mockito.doReturn(regionsInTransition).when(rs).getRegionsInTransitionOrderedByTimestamp();
|
||||
Mockito.when(am.getState(any(String.class))).thenReturn(new RegionState(null, null));
|
||||
|
||||
// Render to a string
|
||||
StringWriter sw = new StringWriter();
|
||||
|
|
Loading…
Reference in New Issue