From 5df9651581f599ba9bcbb0def660870ab0398ccc Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 3 Nov 2017 15:03:27 -0700 Subject: [PATCH] HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster --- .../hbase/rsgroup/RSGroupInfoManager.java | 4 + .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 89 ++++++++++++++++++- 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index c8fee448354..3fb40da48c9 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address; */ @InterfaceAudience.Private public interface RSGroupInfoManager { + + String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait"; + long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L; + //Assigned before user tables TableName RSGROUP_TABLE_NAME = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 9520f5fbf8a..7cf04c7cc09 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; @@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; @@ -144,6 +146,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager { private Set prevRSGroups = new HashSet<>(); private final ServerEventsListenerThread serverEventsListenerThread = new ServerEventsListenerThread(); + private FailedOpenUpdaterThread failedOpenUpdaterThread; private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { this.masterServices = masterServices; @@ -156,6 +159,9 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager { rsGroupStartupWorker.start(); serverEventsListenerThread.start(); masterServices.getServerManager().registerListener(serverEventsListenerThread); + failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration()); + failedOpenUpdaterThread.start(); + masterServices.getServerManager().registerListener(failedOpenUpdaterThread); } static RSGroupInfoManager getInstance(MasterServices master) throws IOException { @@ -564,6 +570,26 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager { flushConfig(newGroupMap); } + // Called by FailedOpenUpdaterThread + private void updateFailedAssignments() { + // Kick all regions in FAILED_OPEN state + List stuckAssignments = Lists.newArrayList(); + for (RegionStateNode state: + masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) { + if (state.isStuck()) { + stuckAssignments.add(state.getRegionInfo()); + } + } + for (RegionInfo region: stuckAssignments) { + LOG.info("Retrying assignment of " + region); + try { + masterServices.getAssignmentManager().unassign(region); + } catch (IOException e) { + LOG.warn("Unable to reassign " + region, e); + } + } + } + /** * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known * servers. Notifications about server changes are received by registering {@link ServerListener}. @@ -608,7 +634,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager { } try { synchronized (this) { - if(!changed) { + while (!changed) { wait(); } changed = false; @@ -623,6 +649,67 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager { } } + private class FailedOpenUpdaterThread extends Thread implements ServerListener { + private final long waitInterval; + private volatile boolean hasChanged = false; + + public FailedOpenUpdaterThread(Configuration conf) { + this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, + DEFAULT_REASSIGN_WAIT_INTERVAL); + setDaemon(true); + } + + @Override + public void serverAdded(ServerName serverName) { + serverChanged(); + } + + @Override + public void serverRemoved(ServerName serverName) { + } + + @Override + public void run() { + while (isMasterRunning(masterServices)) { + boolean interrupted = false; + try { + synchronized (this) { + while (!hasChanged) { + wait(); + } + hasChanged = false; + } + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + interrupted = true; + } + if (!isMasterRunning(masterServices) || interrupted) { + continue; + } + + // First, wait a while in case more servers are about to rejoin the cluster + try { + Thread.sleep(waitInterval); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + } + if (!isMasterRunning(masterServices)) { + continue; + } + + // Kick all regions in FAILED_OPEN state + updateFailedAssignments(); + } + } + + public void serverChanged() { + synchronized (this) { + hasChanged = true; + this.notify(); + } + } + } + private class RSGroupStartupWorker extends Thread { private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class); private volatile boolean online = false;