HBASE-22933 Do not need to kick reassign for rs group change any more (#550)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
6ece958268
commit
4a4208f872
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -33,7 +32,6 @@ import java.util.OptionalLong;
|
|||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.ServerListener;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
|
@ -136,7 +132,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
private Set<String> prevRSGroups = new HashSet<>();
|
||||
private final ServerEventsListenerThread serverEventsListenerThread =
|
||||
new ServerEventsListenerThread();
|
||||
private FailedOpenUpdaterThread failedOpenUpdaterThread;
|
||||
|
||||
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
|
||||
this.masterServices = masterServices;
|
||||
|
@ -150,9 +145,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
refresh();
|
||||
serverEventsListenerThread.start();
|
||||
masterServices.getServerManager().registerListener(serverEventsListenerThread);
|
||||
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
|
||||
failedOpenUpdaterThread.start();
|
||||
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
|
||||
}
|
||||
|
||||
static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
|
||||
|
@ -625,26 +617,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
flushConfig(newGroupMap);
|
||||
}
|
||||
|
||||
// Called by FailedOpenUpdaterThread
|
||||
private void updateFailedAssignments() {
|
||||
// Kick all regions in FAILED_OPEN state
|
||||
List<RegionInfo> stuckAssignments = Lists.newArrayList();
|
||||
for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
|
||||
.getRegionsInTransition()) {
|
||||
if (state.isStuck()) {
|
||||
stuckAssignments.add(state.getRegionInfo());
|
||||
}
|
||||
}
|
||||
for (RegionInfo region : stuckAssignments) {
|
||||
LOG.info("Retrying assignment of " + region);
|
||||
try {
|
||||
masterServices.getAssignmentManager().unassign(region);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to reassign " + region, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
|
||||
* servers. Notifications about server changes are received by registering {@link ServerListener}.
|
||||
|
@ -704,66 +676,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
}
|
||||
}
|
||||
|
||||
private class FailedOpenUpdaterThread extends Thread implements ServerListener {
|
||||
private final long waitInterval;
|
||||
private volatile boolean hasChanged = false;
|
||||
|
||||
public FailedOpenUpdaterThread(Configuration conf) {
|
||||
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverAdded(ServerName serverName) {
|
||||
serverChanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverRemoved(ServerName serverName) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (isMasterRunning(masterServices)) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
synchronized (this) {
|
||||
while (!hasChanged) {
|
||||
wait();
|
||||
}
|
||||
hasChanged = false;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
interrupted = true;
|
||||
}
|
||||
if (!isMasterRunning(masterServices) || interrupted) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// First, wait a while in case more servers are about to rejoin the cluster
|
||||
try {
|
||||
Thread.sleep(waitInterval);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted", e);
|
||||
}
|
||||
if (!isMasterRunning(masterServices)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Kick all regions in FAILED_OPEN state
|
||||
updateFailedAssignments();
|
||||
}
|
||||
}
|
||||
|
||||
public void serverChanged() {
|
||||
synchronized (this) {
|
||||
hasChanged = true;
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class RSGroupStartupWorker extends Thread {
|
||||
private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
|
||||
private volatile boolean online = false;
|
||||
|
|
Loading…
Reference in New Issue