HBASE-20698 Master don't record right server version until new started region server call regionServerReport method

This commit is contained in:
Guanghao Zhang 2018-06-08 16:00:20 +08:00
parent 519236b4af
commit 5fd16f3853
10 changed files with 95 additions and 45 deletions

View File

@ -32,6 +32,14 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface ServerMetrics { public interface ServerMetrics {
ServerName getServerName(); ServerName getServerName();
/**
* @return the version number of a regionserver.
*/
default int getVersionNumber() {
return 0;
}
/** /**
* @return the number of requests per second. * @return the number of requests per second.
*/ */

View File

@ -48,14 +48,23 @@ public final class ServerMetricsBuilder {
return newBuilder(sn).build(); return newBuilder(sn).build();
} }
public static ServerMetrics of(ServerName sn, int versionNumber) {
return newBuilder(sn).setVersionNumber(versionNumber).build();
}
public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) { public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) {
return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0,
serverInfo.getServerLoad()); serverInfo.getServerLoad());
} }
public static ServerMetrics toServerMetrics(ServerName serverName, public static ServerMetrics toServerMetrics(ServerName serverName,
ClusterStatusProtos.ServerLoad serverLoadPB) { ClusterStatusProtos.ServerLoad serverLoadPB) {
return ServerMetricsBuilder.newBuilder(serverName) return toServerMetrics(serverName, 0, serverLoadPB);
}
public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber,
ClusterStatusProtos.ServerLoad serverLoadPB) {
return ServerMetricsBuilder.newBuilder(serverName).setVersionNumber(versionNumber)
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests()) .setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
.setRequestCount(serverLoadPB.getTotalNumberOfRequests()) .setRequestCount(serverLoadPB.getTotalNumberOfRequests())
.setInfoServerPort(serverLoadPB.getInfoServerPort()) .setInfoServerPort(serverLoadPB.getInfoServerPort())
@ -110,6 +119,7 @@ public final class ServerMetricsBuilder {
} }
private final ServerName serverName; private final ServerName serverName;
private int versionNumber;
private long requestCountPerSecond; private long requestCountPerSecond;
private long requestCount; private long requestCount;
private Size usedHeapSize = Size.ZERO; private Size usedHeapSize = Size.ZERO;
@ -126,6 +136,11 @@ public final class ServerMetricsBuilder {
this.serverName = serverName; this.serverName = serverName;
} }
public ServerMetricsBuilder setVersionNumber(int versionNumber) {
this.versionNumber = versionNumber;
return this;
}
public ServerMetricsBuilder setRequestCountPerSecond(long value) { public ServerMetricsBuilder setRequestCountPerSecond(long value) {
this.requestCountPerSecond = value; this.requestCountPerSecond = value;
return this; return this;
@ -184,6 +199,7 @@ public final class ServerMetricsBuilder {
public ServerMetrics build() { public ServerMetrics build() {
return new ServerMetricsImpl( return new ServerMetricsImpl(
serverName, serverName,
versionNumber,
requestCountPerSecond, requestCountPerSecond,
requestCount, requestCount,
usedHeapSize, usedHeapSize,
@ -199,6 +215,7 @@ public final class ServerMetricsBuilder {
private static class ServerMetricsImpl implements ServerMetrics { private static class ServerMetricsImpl implements ServerMetrics {
private final ServerName serverName; private final ServerName serverName;
private final int versionNumber;
private final long requestCountPerSecond; private final long requestCountPerSecond;
private final long requestCount; private final long requestCount;
private final Size usedHeapSize; private final Size usedHeapSize;
@ -212,11 +229,13 @@ public final class ServerMetricsBuilder {
private final long reportTimestamp; private final long reportTimestamp;
private final long lastReportTimestamp; private final long lastReportTimestamp;
ServerMetricsImpl(ServerName serverName, long requestCountPerSecond, long requestCount, ServerMetricsImpl(ServerName serverName, int versionNumber, long requestCountPerSecond,
Size usedHeapSize, Size maxHeapSize, int infoServerPort, List<ReplicationLoadSource> sources, long requestCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
ReplicationLoadSink sink, Map<byte[], RegionMetrics> regionStatus, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Set<String> coprocessorNames, long reportTimestamp, long lastReportTimestamp) { Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp) {
this.serverName = Preconditions.checkNotNull(serverName); this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.requestCountPerSecond = requestCountPerSecond; this.requestCountPerSecond = requestCountPerSecond;
this.requestCount = requestCount; this.requestCount = requestCount;
this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize); this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize);
@ -234,6 +253,12 @@ public final class ServerMetricsBuilder {
public ServerName getServerName() { public ServerName getServerName() {
return serverName; return serverName;
} }
@Override
public int getVersionNumber() {
return versionNumber;
}
@Override @Override
public long getRequestCountPerSecond() { public long getRequestCountPerSecond() {
return requestCountPerSecond; return requestCountPerSecond;

View File

@ -454,14 +454,14 @@ public class MasterRpcServices extends RSRpcServices
RpcController controller, RegionServerReportRequest request) throws ServiceException { RpcController controller, RegionServerReportRequest request) throws ServiceException {
try { try {
master.checkServiceStarted(); master.checkServiceStarted();
int version = VersionInfoUtil.getCurrentClientVersionNumber();
ClusterStatusProtos.ServerLoad sl = request.getLoad(); ClusterStatusProtos.ServerLoad sl = request.getLoad();
ServerName serverName = ProtobufUtil.toServerName(request.getServer()); ServerName serverName = ProtobufUtil.toServerName(request.getServer());
ServerMetrics oldLoad = master.getServerManager().getLoad(serverName); ServerMetrics oldLoad = master.getServerManager().getLoad(serverName);
ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, sl); ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, version, sl);
master.getServerManager().regionServerReport(serverName, newLoad); master.getServerManager().regionServerReport(serverName, newLoad);
int version = VersionInfoUtil.getCurrentClientVersionNumber(); master.getAssignmentManager()
master.getAssignmentManager().reportOnlineRegions(serverName, .reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet());
version, newLoad.getRegionMetrics().keySet());
if (sl != null && master.metricsMaster != null) { if (sl != null && master.metricsMaster != null) {
// Up our metrics. // Up our metrics.
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
@ -479,11 +479,12 @@ public class MasterRpcServices extends RSRpcServices
// Register with server manager // Register with server manager
try { try {
master.checkServiceStarted(); master.checkServiceStarted();
int version = VersionInfoUtil.getCurrentClientVersionNumber();
InetAddress ia = master.getRemoteInetAddress( InetAddress ia = master.getRemoteInetAddress(
request.getPort(), request.getServerStartCode()); request.getPort(), request.getServerStartCode());
// if regionserver passed hostname to use, // if regionserver passed hostname to use,
// then use it instead of doing a reverse DNS lookup // then use it instead of doing a reverse DNS lookup
ServerName rs = master.getServerManager().regionServerStartup(request, ia); ServerName rs = master.getServerManager().regionServerStartup(request, version, ia);
// Send back some config info // Send back some config info
RegionServerStartupResponse.Builder resp = createConfigurationSubset(); RegionServerStartupResponse.Builder resp = createConfigurationSubset();

View File

@ -222,11 +222,13 @@ public class ServerManager {
/** /**
* Let the server manager know a new regionserver has come online * Let the server manager know a new regionserver has come online
* @param request the startup request * @param request the startup request
* @param versionNumber the version of the new regionserver
* @param ia the InetAddress from which request is received * @param ia the InetAddress from which request is received
* @return The ServerName we know this server as. * @return The ServerName we know this server as.
* @throws IOException * @throws IOException
*/ */
ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia) ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
InetAddress ia)
throws IOException { throws IOException {
// Test for case where we get a region startup message from a regionserver // Test for case where we get a region startup message from a regionserver
// that has been quickly restarted but whose znode expiration handler has // that has been quickly restarted but whose znode expiration handler has
@ -242,7 +244,7 @@ public class ServerManager {
request.getServerStartCode()); request.getServerStartCode());
checkClockSkew(sn, request.getServerCurrentTime()); checkClockSkew(sn, request.getServerCurrentTime());
checkIsDead(sn, "STARTUP"); checkIsDead(sn, "STARTUP");
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn))) { if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber))) {
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup" LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
+ " could not record the server: " + sn); + " could not record the server: " + sn);
} }
@ -1058,4 +1060,9 @@ public class ServerManager {
removeRegion(hri); removeRegion(hri);
} }
} }
public int getServerVersion(final ServerName serverName) {
ServerMetrics serverMetrics = onlineServers.get(serverName);
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
}
} }

View File

@ -913,7 +913,7 @@ public class AssignmentManager implements ServerListener {
master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the split // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
throw new UnsupportedOperationException(String.format( throw new UnsupportedOperationException(String.format(
"Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
} }
@ -936,7 +936,7 @@ public class AssignmentManager implements ServerListener {
master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
throw new UnsupportedOperationException(String.format( throw new UnsupportedOperationException(String.format(
"Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
hriB)); hriB));
@ -948,13 +948,13 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================ // ============================================================================================
/** /**
* the master will call this method when the RS send the regionServerReport(). * the master will call this method when the RS send the regionServerReport().
* the report will contains the "hbase version" and the "online regions". * the report will contains the "online regions".
* this method will check the the online regions against the in-memory state of the AM, * this method will check the the online regions against the in-memory state of the AM,
* if there is a mismatch we will try to fence out the RS with the assumption * if there is a mismatch we will try to fence out the RS with the assumption
* that something went wrong on the RS side. * that something went wrong on the RS side.
*/ */
public void reportOnlineRegions(final ServerName serverName, public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
final int versionNumber, final Set<byte[]> regionNames) throws YouAreDeadException { throws YouAreDeadException {
if (!isRunning()) return; if (!isRunning()) return;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() + LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
@ -965,9 +965,7 @@ public class AssignmentManager implements ServerListener {
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
// update the server version number. This will be used for live upgrades.
synchronized (serverNode) { synchronized (serverNode) {
serverNode.setVersionNumber(versionNumber);
if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) { if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) {
LOG.warn("Got a report from a server result in state " + serverNode.getState()); LOG.warn("Got a report from a server result in state " + serverNode.getState());
return; return;
@ -1910,11 +1908,6 @@ public class AssignmentManager implements ServerListener {
wakeServerReportEvent(serverNode); wakeServerReportEvent(serverNode);
} }
public int getServerVersion(final ServerName serverName) {
final ServerStateNode node = regionStates.getServerNode(serverName);
return node != null ? node.getVersionNumber() : 0;
}
private void killRegionServer(final ServerName serverName) { private void killRegionServer(final ServerName serverName) {
final ServerStateNode serverNode = regionStates.getServerNode(serverName); final ServerStateNode serverNode = regionStates.getServerNode(serverName);
killRegionServer(serverNode); killRegionServer(serverNode);

View File

@ -338,7 +338,6 @@ public class RegionStates {
private final ServerName serverName; private final ServerName serverName;
private volatile ServerState state = ServerState.ONLINE; private volatile ServerState state = ServerState.ONLINE;
private volatile int versionNumber = 0;
public ServerStateNode(final ServerName serverName) { public ServerStateNode(final ServerName serverName) {
this.serverName = serverName; this.serverName = serverName;
@ -354,10 +353,6 @@ public class RegionStates {
return state; return state;
} }
public int getVersionNumber() {
return versionNumber;
}
public ProcedureEvent<?> getReportEvent() { public ProcedureEvent<?> getReportEvent() {
return reportEvent; return reportEvent;
} }
@ -380,10 +375,6 @@ public class RegionStates {
this.state = state; this.state = state;
} }
public void setVersionNumber(final int versionNumber) {
this.versionNumber = versionNumber;
}
public Set<RegionStateNode> getRegions() { public Set<RegionStateNode> getRegions() {
return regions; return regions;
} }

View File

@ -107,7 +107,7 @@ public class RSProcedureDispatcher
@Override @Override
protected void remoteDispatch(final ServerName serverName, protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) { final Set<RemoteProcedure> remoteProcedures) {
final int rsVersion = master.getAssignmentManager().getServerVersion(serverName); final int rsVersion = master.getServerManager().getServerVersion(serverName);
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", LOG.trace("Using procedure batch rpc execution for serverName={} version={}",
serverName, rsVersion); serverName, rsVersion);

View File

@ -66,7 +66,7 @@ public class TestClockSkewDetection {
request.setPort(1234); request.setPort(1234);
request.setServerStartCode(-1); request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis()); request.setServerCurrentTime(System.currentTimeMillis());
sm.regionServerStartup(request.build(), ia1); sm.regionServerStartup(request.build(), 0, ia1);
final Configuration c = HBaseConfiguration.create(); final Configuration c = HBaseConfiguration.create();
long maxSkew = c.getLong("hbase.master.maxclockskew", 30000); long maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
@ -81,7 +81,7 @@ public class TestClockSkewDetection {
request.setPort(1235); request.setPort(1235);
request.setServerStartCode(-1); request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2); request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2);
sm.regionServerStartup(request.build(), ia2); sm.regionServerStartup(request.build(), 0, ia2);
fail("HMaster should have thrown a ClockOutOfSyncException but didn't."); fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
} catch(ClockOutOfSyncException e) { } catch(ClockOutOfSyncException e) {
//we want an exception //we want an exception
@ -97,7 +97,7 @@ public class TestClockSkewDetection {
request.setPort(1236); request.setPort(1236);
request.setServerStartCode(-1); request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2); request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2);
sm.regionServerStartup(request.build(), ia3); sm.regionServerStartup(request.build(), 0, ia3);
fail("HMaster should have thrown a ClockOutOfSyncException but didn't."); fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
} catch (ClockOutOfSyncException e) { } catch (ClockOutOfSyncException e) {
// we want an exception // we want an exception
@ -111,7 +111,7 @@ public class TestClockSkewDetection {
request.setPort(1237); request.setPort(1237);
request.setServerStartCode(-1); request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2); request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2);
sm.regionServerStartup(request.build(), ia4); sm.regionServerStartup(request.build(), 0, ia4);
// make sure values above warning threshold but below max threshold don't kill // make sure values above warning threshold but below max threshold don't kill
LOG.debug("regionServerStartup 5"); LOG.debug("regionServerStartup 5");
@ -120,9 +120,6 @@ public class TestClockSkewDetection {
request.setPort(1238); request.setPort(1238);
request.setServerStartCode(-1); request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2); request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2);
sm.regionServerStartup(request.build(), ia5); sm.regionServerStartup(request.build(), 0, ia5);
} }
} }

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -211,4 +212,31 @@ public class TestRestartCluster {
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
} }
} }
@Test
public void testNewStartedRegionServerVersion() throws Exception {
UTIL.startMiniCluster(1);
// Start 3 new region server
Thread t = new Thread(() -> {
for (int i = 0; i < 3; i++) {
try {
JVMClusterUtil.RegionServerThread newRS = UTIL.getMiniHBaseCluster().startRegionServer();
newRS.waitForServerOnline();
} catch (IOException e) {
LOG.error("Failed to start a new RS", e);
}
}
});
t.start();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
while (t.isAlive()) {
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
for (ServerName serverName : serverNames) {
assertNotEquals(0, master.getServerManager().getServerVersion(serverName));
}
Thread.sleep(100);
}
}
} }

View File

@ -125,7 +125,7 @@ public class MockMasterServices extends MockNoopMasterServices {
// Make a report with current state of the server 'serverName' before we call wait.. // Make a report with current state of the server 'serverName' before we call wait..
SortedSet<byte []> regions = regionsToRegionServers.get(serverName); SortedSet<byte []> regions = regionsToRegionServers.get(serverName);
try { try {
getAssignmentManager().reportOnlineRegions(serverName, 0, getAssignmentManager().reportOnlineRegions(serverName,
regions == null? new HashSet<byte []>(): regions); regions == null? new HashSet<byte []>(): regions);
} catch (YouAreDeadException e) { } catch (YouAreDeadException e) {
throw new RuntimeException(e); throw new RuntimeException(e);