HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster

This commit is contained in:
Andrew Purtell 2017-11-03 15:03:27 -07:00
parent b9b0f15cdb
commit 5df9651581
2 changed files with 92 additions and 1 deletions

View File

@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
*/
@InterfaceAudience.Private
public interface RSGroupInfoManager {
String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
//Assigned before user tables
TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

View File

@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@ -144,6 +146,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
private FailedOpenUpdaterThread failedOpenUpdaterThread;
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
@ -156,6 +159,9 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
rsGroupStartupWorker.start();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
failedOpenUpdaterThread.start();
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
}
static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
@ -564,6 +570,26 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
flushConfig(newGroupMap);
}
// Called by FailedOpenUpdaterThread
private void updateFailedAssignments() {
// Kick all regions in FAILED_OPEN state
List<RegionInfo> stuckAssignments = Lists.newArrayList();
for (RegionStateNode state:
masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
if (state.isStuck()) {
stuckAssignments.add(state.getRegionInfo());
}
}
for (RegionInfo region: stuckAssignments) {
LOG.info("Retrying assignment of " + region);
try {
masterServices.getAssignmentManager().unassign(region);
} catch (IOException e) {
LOG.warn("Unable to reassign " + region, e);
}
}
}
/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
@ -608,7 +634,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
try {
synchronized (this) {
if(!changed) {
while (!changed) {
wait();
}
changed = false;
@ -623,6 +649,67 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
}
private class FailedOpenUpdaterThread extends Thread implements ServerListener {
private final long waitInterval;
private volatile boolean hasChanged = false;
public FailedOpenUpdaterThread(Configuration conf) {
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
DEFAULT_REASSIGN_WAIT_INTERVAL);
setDaemon(true);
}
@Override
public void serverAdded(ServerName serverName) {
serverChanged();
}
@Override
public void serverRemoved(ServerName serverName) {
}
@Override
public void run() {
while (isMasterRunning(masterServices)) {
boolean interrupted = false;
try {
synchronized (this) {
while (!hasChanged) {
wait();
}
hasChanged = false;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
interrupted = true;
}
if (!isMasterRunning(masterServices) || interrupted) {
continue;
}
// First, wait a while in case more servers are about to rejoin the cluster
try {
Thread.sleep(waitInterval);
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
}
if (!isMasterRunning(masterServices)) {
continue;
}
// Kick all regions in FAILED_OPEN state
updateFailedAssignments();
}
}
public void serverChanged() {
synchronized (this) {
hasChanged = true;
this.notify();
}
}
}
private class RSGroupStartupWorker extends Thread {
private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
private volatile boolean online = false;