HBASE-25774 ServerManager.getOnlineServer may miss some region servers when refreshing state in some procedure implementations
Revert "HBASE-25032 Wait for region server to become online before adding it to online servers in Master (#2769)"
This reverts commit 1e4639d2eb
.
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
This commit is contained in:
parent
6cfff27465
commit
02b018cf1a
|
@ -203,8 +203,7 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Let the server manager know a regionserver is requesting configurations.
|
* Let the server manager know a new regionserver has come online
|
||||||
* Regionserver will not be added here, but in its first report.
|
|
||||||
* @param request the startup request
|
* @param request the startup request
|
||||||
* @param versionNumber the version number of the new regionserver
|
* @param versionNumber the version number of the new regionserver
|
||||||
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
|
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
|
||||||
|
@ -227,6 +226,10 @@ public class ServerManager {
|
||||||
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
|
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
|
||||||
checkClockSkew(sn, request.getServerCurrentTime());
|
checkClockSkew(sn, request.getServerCurrentTime());
|
||||||
checkIsDead(sn, "STARTUP");
|
checkIsDead(sn, "STARTUP");
|
||||||
|
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
|
||||||
|
LOG.warn(
|
||||||
|
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
|
||||||
|
}
|
||||||
return sn;
|
return sn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,6 +280,7 @@ public class ServerManager {
|
||||||
if (null == this.onlineServers.replace(sn, sl)) {
|
if (null == this.onlineServers.replace(sn, sl)) {
|
||||||
// Already have this host+port combo and its just different start code?
|
// Already have this host+port combo and its just different start code?
|
||||||
// Just let the server in. Presume master joining a running cluster.
|
// Just let the server in. Presume master joining a running cluster.
|
||||||
|
// recordNewServer is what happens at the end of reportServerStartup.
|
||||||
// The only thing we are skipping is passing back to the regionserver
|
// The only thing we are skipping is passing back to the regionserver
|
||||||
// the ServerName to use. Here we presume a master has already done
|
// the ServerName to use. Here we presume a master has already done
|
||||||
// that so we'll press on with whatever it gave us for ServerName.
|
// that so we'll press on with whatever it gave us for ServerName.
|
||||||
|
|
|
@ -1031,9 +1031,10 @@ public class HRegionServer extends Thread implements
|
||||||
// node was created, in case any coprocessors want to use ZooKeeper
|
// node was created, in case any coprocessors want to use ZooKeeper
|
||||||
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
||||||
|
|
||||||
// Get configurations from the Master. Break if server is stopped or
|
// Try and register with the Master; tell it we are here. Break if server is stopped or
|
||||||
// the clusterup flag is down or hdfs went wacky. Then start up all Services.
|
// the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
|
||||||
// Use RetryCounter to get backoff in case Master is struggling to come up.
|
// start up all Services. Use RetryCounter to get backoff in case Master is struggling to
|
||||||
|
// come up.
|
||||||
LOG.debug("About to register with Master.");
|
LOG.debug("About to register with Master.");
|
||||||
RetryCounterFactory rcf =
|
RetryCounterFactory rcf =
|
||||||
new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
|
new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
|
||||||
|
@ -1066,7 +1067,7 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run mode.
|
// We registered with the Master. Go into run mode.
|
||||||
long lastMsg = System.currentTimeMillis();
|
long lastMsg = System.currentTimeMillis();
|
||||||
long oldRequestCount = -1;
|
long oldRequestCount = -1;
|
||||||
// The main run loop.
|
// The main run loop.
|
||||||
|
@ -1100,14 +1101,7 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
if ((now - lastMsg) >= msgInterval) {
|
if ((now - lastMsg) >= msgInterval) {
|
||||||
// Register with the Master now that our setup is complete.
|
tryRegionServerReport(lastMsg, now);
|
||||||
if (tryRegionServerReport(lastMsg, now) && !online.get()) {
|
|
||||||
// Wake up anyone waiting for this server to online
|
|
||||||
synchronized (online) {
|
|
||||||
online.set(true);
|
|
||||||
online.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastMsg = System.currentTimeMillis();
|
lastMsg = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
if (!isStopped() && !isAborted()) {
|
if (!isStopped() && !isAborted()) {
|
||||||
|
@ -1276,12 +1270,12 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
protected boolean tryRegionServerReport(long reportStartTime, long reportEndTime)
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
RegionServerStatusService.BlockingInterface rss = rssStub;
|
RegionServerStatusService.BlockingInterface rss = rssStub;
|
||||||
if (rss == null) {
|
if (rss == null) {
|
||||||
// the current server could be stopping.
|
// the current server could be stopping.
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
|
ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
|
||||||
try {
|
try {
|
||||||
|
@ -1301,9 +1295,7 @@ public class HRegionServer extends Thread implements
|
||||||
// Couldn't connect to the master, get location from zk and reconnect
|
// Couldn't connect to the master, get location from zk and reconnect
|
||||||
// Method blocks until new master is found or we are stopped
|
// Method blocks until new master is found or we are stopped
|
||||||
createRegionServerStatusStub(true);
|
createRegionServerStatusStub(true);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1678,6 +1670,11 @@ public class HRegionServer extends Thread implements
|
||||||
", sessionid=0x" +
|
", sessionid=0x" +
|
||||||
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
|
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
|
||||||
|
|
||||||
|
// Wake up anyone waiting for this server to online
|
||||||
|
synchronized (online) {
|
||||||
|
online.set(true);
|
||||||
|
online.notifyAll();
|
||||||
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
stop("Failed initialization");
|
stop("Failed initialization");
|
||||||
throw convertThrowableToIOE(cleanup(e, "Failed init"),
|
throw convertThrowableToIOE(cleanup(e, "Failed init"),
|
||||||
|
@ -2851,9 +2848,10 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Run initialization using parameters passed us by the master.
|
* Let the master know we're here Run initialization using parameters passed
|
||||||
|
* us by the master.
|
||||||
* @return A Map of key/value configurations we got from the Master else
|
* @return A Map of key/value configurations we got from the Master else
|
||||||
* null if we failed during report.
|
* null if we failed to register.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private RegionServerStartupResponse reportForDuty() throws IOException {
|
private RegionServerStartupResponse reportForDuty() throws IOException {
|
||||||
|
|
|
@ -446,9 +446,8 @@ public class MiniHBaseCluster extends HBaseCluster {
|
||||||
ServerName rsServerName = t.getRegionServer().getServerName();
|
ServerName rsServerName = t.getRegionServer().getServerName();
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
ClusterMetrics clusterStatus;
|
ClusterMetrics clusterStatus = getClusterMetrics();
|
||||||
while ((System.currentTimeMillis() - start) < timeout) {
|
while ((System.currentTimeMillis() - start) < timeout) {
|
||||||
clusterStatus = getClusterMetrics();
|
|
||||||
if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
|
if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,9 +58,8 @@ public class TestGetReplicationLoad {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean tryRegionServerReport(long reportStartTime, long reportEndTime) {
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
|
||||||
// do nothing
|
// do nothing
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,11 @@ public class TestMasterMetrics {
|
||||||
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
|
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MyRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
public static class MyRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||||
|
@ -68,6 +73,11 @@ public class TestMasterMetrics {
|
||||||
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
|
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -98,14 +108,11 @@ public class TestMasterMetrics {
|
||||||
request.setServer(ProtobufUtil.toServerName(serverName));
|
request.setServer(ProtobufUtil.toServerName(serverName));
|
||||||
long expectedRequestNumber = 10000;
|
long expectedRequestNumber = 10000;
|
||||||
|
|
||||||
|
MetricsMasterSource masterSource = master.getMasterMetrics().getMetricsSource();
|
||||||
ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
|
ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
|
||||||
.setTotalNumberOfRequests(expectedRequestNumber).build();
|
.setTotalNumberOfRequests(expectedRequestNumber).build();
|
||||||
request.setLoad(sl);
|
request.setLoad(sl);
|
||||||
|
|
||||||
MetricsMasterSource masterSource = master.getMasterMetrics().getMetricsSource();
|
|
||||||
// Init master source again to reset cluster requests counter
|
|
||||||
masterSource.init();
|
|
||||||
|
|
||||||
master.getMasterRpcServices().regionServerReport(null, request.build());
|
master.getMasterRpcServices().regionServerReport(null, request.build());
|
||||||
metricsHelper.assertCounter("cluster_requests", expectedRequestNumber, masterSource);
|
metricsHelper.assertCounter("cluster_requests", expectedRequestNumber, masterSource);
|
||||||
|
|
||||||
|
|
|
@ -84,14 +84,13 @@ public class TestCompactionInDeadRegionServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean tryRegionServerReport(long reportStartTime, long reportEndTime)
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
||||||
} catch (YouAreDeadException e) {
|
} catch (YouAreDeadException e) {
|
||||||
// ignore, do not abort
|
// ignore, do not abort
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,15 +85,13 @@ public class TestShutdownWhileWALBroken {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean tryRegionServerReport(long reportStartTime, long reportEndTime)
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
||||||
} catch (YouAreDeadException e) {
|
} catch (YouAreDeadException e) {
|
||||||
LOG.info("Caught YouAreDeadException, ignore", e);
|
LOG.info("Caught YouAreDeadException, ignore", e);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue