HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster
This commit is contained in:
parent
e61f6ff0ec
commit
1e227acd65
|
@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RSGroupInfoManager {
|
||||
|
||||
String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
|
||||
long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
|
||||
|
||||
//Assigned before user tables
|
||||
TableName RSGROUP_TABLE_NAME =
|
||||
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException;
|
|||
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.ServerListener;
|
||||
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
|
@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
|||
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
private volatile Set<String> prevRSGroups;
|
||||
private RSGroupSerDe rsGroupSerDe;
|
||||
private DefaultServerUpdater defaultServerUpdater;
|
||||
private FailedOpenUpdater failedOpenUpdater;
|
||||
private boolean isInit = false;
|
||||
|
||||
public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
|
||||
|
@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
refresh();
|
||||
rsGroupStartupWorker.start();
|
||||
defaultServerUpdater = new DefaultServerUpdater(this);
|
||||
Threads.setDaemonThreadRunning(defaultServerUpdater);
|
||||
failedOpenUpdater = new FailedOpenUpdater(this);
|
||||
Threads.setDaemonThreadRunning(failedOpenUpdater);
|
||||
master.getServerManager().registerListener(this);
|
||||
defaultServerUpdater.start();
|
||||
isInit = true;
|
||||
}
|
||||
|
||||
|
@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
@Override
|
||||
public void serverAdded(ServerName serverName) {
|
||||
defaultServerUpdater.serverChanged();
|
||||
failedOpenUpdater.serverChanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -503,18 +509,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
private static class DefaultServerUpdater extends Thread {
|
||||
private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
|
||||
private RSGroupInfoManagerImpl mgr;
|
||||
private boolean hasChanged = false;
|
||||
private volatile boolean hasChanged = false;
|
||||
|
||||
public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
|
||||
this.mgr = mgr;
|
||||
setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName());
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<Address> prevDefaultServers = new LinkedList<Address>();
|
||||
while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
|
||||
while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
|
||||
try {
|
||||
LOG.info("Updating default servers.");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updating default servers");
|
||||
}
|
||||
List<Address> servers = mgr.getDefaultServers();
|
||||
Collections.sort(servers, new Comparator<Address>() {
|
||||
@Override
|
||||
|
@ -533,12 +543,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
}
|
||||
try {
|
||||
synchronized (this) {
|
||||
if(!hasChanged) {
|
||||
while (!hasChanged) {
|
||||
wait();
|
||||
}
|
||||
hasChanged = false;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to update default servers", e);
|
||||
|
@ -546,6 +557,75 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
}
|
||||
}
|
||||
|
||||
// Called for both server additions and removals
|
||||
public void serverChanged() {
|
||||
synchronized (this) {
|
||||
hasChanged = true;
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class FailedOpenUpdater extends Thread {
|
||||
private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class);
|
||||
|
||||
private final RSGroupInfoManagerImpl mgr;
|
||||
private final long waitInterval;
|
||||
private volatile boolean hasChanged = false;
|
||||
|
||||
public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) {
|
||||
this.mgr = mgr;
|
||||
this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY,
|
||||
DEFAULT_REASSIGN_WAIT_INTERVAL);
|
||||
setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName());
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
synchronized (this) {
|
||||
while (!hasChanged) {
|
||||
wait();
|
||||
}
|
||||
hasChanged = false;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
interrupted = true;
|
||||
}
|
||||
if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// First, wait a while in case more servers are about to rejoin the cluster
|
||||
try {
|
||||
Thread.sleep(waitInterval);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
}
|
||||
if (mgr.master.isAborted() || mgr.master.isStopped()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Kick all regions in FAILED_OPEN state
|
||||
List<HRegionInfo> failedAssignments = Lists.newArrayList();
|
||||
for (RegionState state:
|
||||
mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
|
||||
if (state.isFailedOpen()) {
|
||||
failedAssignments.add(state.getRegion());
|
||||
}
|
||||
}
|
||||
for (HRegionInfo region: failedAssignments) {
|
||||
LOG.info("Retrying assignment of " + region);
|
||||
mgr.master.getAssignmentManager().unassign(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only called for server additions
|
||||
public void serverChanged() {
|
||||
synchronized (this) {
|
||||
hasChanged = true;
|
||||
|
|
Loading…
Reference in New Issue