diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index f33e9782054..680b92470ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -32,6 +32,14 @@ import org.apache.yetus.audience.InterfaceAudience; public interface ServerMetrics { ServerName getServerName(); + + /** + * @return the version number of a regionserver. + */ + default int getVersionNumber() { + return 0; + } + /** * @return the number of requests per second. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index da9e7243ad0..50addf6bddf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -48,14 +48,23 @@ public final class ServerMetricsBuilder { return newBuilder(sn).build(); } + public static ServerMetrics of(ServerName sn, int versionNumber) { + return newBuilder(sn).setVersionNumber(versionNumber).build(); + } + public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) { - return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), + return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0, serverInfo.getServerLoad()); } public static ServerMetrics toServerMetrics(ServerName serverName, ClusterStatusProtos.ServerLoad serverLoadPB) { - return ServerMetricsBuilder.newBuilder(serverName) + return toServerMetrics(serverName, 0, serverLoadPB); + } + + public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber, + ClusterStatusProtos.ServerLoad serverLoadPB) { + return ServerMetricsBuilder.newBuilder(serverName).setVersionNumber(versionNumber) .setRequestCountPerSecond(serverLoadPB.getNumberOfRequests()) .setRequestCount(serverLoadPB.getTotalNumberOfRequests()) .setInfoServerPort(serverLoadPB.getInfoServerPort()) @@ -110,6 +119,7 @@ public final class ServerMetricsBuilder { } private final ServerName serverName; + private int versionNumber; private long requestCountPerSecond; private long requestCount; private Size usedHeapSize = Size.ZERO; @@ -126,6 +136,11 @@ public final class ServerMetricsBuilder { this.serverName = serverName; } + public ServerMetricsBuilder setVersionNumber(int versionNumber) { + this.versionNumber = versionNumber; + return this; + } + public ServerMetricsBuilder setRequestCountPerSecond(long value) { this.requestCountPerSecond = value; return this; @@ -184,6 +199,7 @@ public final class ServerMetricsBuilder { public ServerMetrics build() { return new ServerMetricsImpl( serverName, + versionNumber, requestCountPerSecond, requestCount, usedHeapSize, @@ -199,6 +215,7 @@ public final class ServerMetricsBuilder { private static class ServerMetricsImpl implements ServerMetrics { private final ServerName serverName; + private final int versionNumber; private final long requestCountPerSecond; private final long requestCount; private final Size usedHeapSize; @@ -212,11 +229,13 @@ public final class ServerMetricsBuilder { private final long reportTimestamp; private final long lastReportTimestamp; - ServerMetricsImpl(ServerName serverName, long requestCountPerSecond, long requestCount, - Size usedHeapSize, Size maxHeapSize, int infoServerPort, List sources, - ReplicationLoadSink sink, Map regionStatus, - Set coprocessorNames, long reportTimestamp, long lastReportTimestamp) { + ServerMetricsImpl(ServerName serverName, int versionNumber, long requestCountPerSecond, + long requestCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort, + List sources, ReplicationLoadSink sink, + Map regionStatus, Set coprocessorNames, long reportTimestamp, + long lastReportTimestamp) { this.serverName = Preconditions.checkNotNull(serverName); + this.versionNumber = versionNumber; this.requestCountPerSecond = requestCountPerSecond; this.requestCount = requestCount; this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize); @@ -234,6 +253,12 @@ public final class ServerMetricsBuilder { public ServerName getServerName() { return serverName; } + + @Override + public int getVersionNumber() { + return versionNumber; + } + @Override public long getRequestCountPerSecond() { return requestCountPerSecond; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 932d490c82a..a097b9e8b7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -452,14 +452,14 @@ public class MasterRpcServices extends RSRpcServices RpcController controller, RegionServerReportRequest request) throws ServiceException { try { master.checkServiceStarted(); + int version = VersionInfoUtil.getCurrentClientVersionNumber(); ClusterStatusProtos.ServerLoad sl = request.getLoad(); ServerName serverName = ProtobufUtil.toServerName(request.getServer()); ServerMetrics oldLoad = master.getServerManager().getLoad(serverName); - ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, sl); + ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, version, sl); master.getServerManager().regionServerReport(serverName, newLoad); - int version = VersionInfoUtil.getCurrentClientVersionNumber(); - master.getAssignmentManager().reportOnlineRegions(serverName, - version, newLoad.getRegionMetrics().keySet()); + master.getAssignmentManager() + .reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet()); if (sl != null && master.metricsMaster != null) { // Up our metrics. master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() @@ -477,11 +477,12 @@ public class MasterRpcServices extends RSRpcServices // Register with server manager try { master.checkServiceStarted(); + int version = VersionInfoUtil.getCurrentClientVersionNumber(); InetAddress ia = master.getRemoteInetAddress( request.getPort(), request.getServerStartCode()); // if regionserver passed hostname to use, // then use it instead of doing a reverse DNS lookup - ServerName rs = master.getServerManager().regionServerStartup(request, ia); + ServerName rs = master.getServerManager().regionServerStartup(request, version, ia); // Send back some config info RegionServerStartupResponse.Builder resp = createConfigurationSubset(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 7eda0576640..b2a6a723315 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -222,11 +222,13 @@ public class ServerManager { /** * Let the server manager know a new regionserver has come online * @param request the startup request + * @param versionNumber the version of the new regionserver * @param ia the InetAddress from which request is received * @return The ServerName we know this server as. * @throws IOException */ - ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia) + ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber, + InetAddress ia) throws IOException { // Test for case where we get a region startup message from a regionserver // that has been quickly restarted but whose znode expiration handler has @@ -242,7 +244,7 @@ public class ServerManager { request.getServerStartCode()); checkClockSkew(sn, request.getServerCurrentTime()); checkIsDead(sn, "STARTUP"); - if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn))) { + if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber))) { LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn); } @@ -1058,4 +1060,9 @@ public class ServerManager { removeRegion(hri); } } + + public int getServerVersion(final ServerName serverName) { + ServerMetrics serverMetrics = onlineServers.get(serverName); + return serverMetrics != null ? serverMetrics.getVersionNumber() : 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 3c9b0d3d030..1f20e889f92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -913,7 +913,7 @@ public class AssignmentManager implements ServerListener { master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split - if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { + if (master.getServerManager().getServerVersion(serverName) < 0x0200000) { throw new UnsupportedOperationException(String.format( "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); } @@ -936,7 +936,7 @@ public class AssignmentManager implements ServerListener { master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge - if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) { + if (master.getServerManager().getServerVersion(serverName) < 0x0200000) { throw new UnsupportedOperationException(String.format( "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, hriB)); @@ -948,13 +948,13 @@ public class AssignmentManager implements ServerListener { // ============================================================================================ /** * the master will call this method when the RS send the regionServerReport(). - * the report will contains the "hbase version" and the "online regions". + * the report will contains the "online regions". * this method will check the the online regions against the in-memory state of the AM, * if there is a mismatch we will try to fence out the RS with the assumption * that something went wrong on the RS side. */ - public void reportOnlineRegions(final ServerName serverName, - final int versionNumber, final Set regionNames) throws YouAreDeadException { + public void reportOnlineRegions(final ServerName serverName, final Set regionNames) + throws YouAreDeadException { if (!isRunning()) return; if (LOG.isTraceEnabled()) { LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() + @@ -965,9 +965,7 @@ public class AssignmentManager implements ServerListener { final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); - // update the server version number. This will be used for live upgrades. synchronized (serverNode) { - serverNode.setVersionNumber(versionNumber); if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) { LOG.warn("Got a report from a server result in state " + serverNode.getState()); return; @@ -1910,11 +1908,6 @@ public class AssignmentManager implements ServerListener { wakeServerReportEvent(serverNode); } - public int getServerVersion(final ServerName serverName) { - final ServerStateNode node = regionStates.getServerNode(serverName); - return node != null ? node.getVersionNumber() : 0; - } - private void killRegionServer(final ServerName serverName) { final ServerStateNode serverNode = regionStates.getServerNode(serverName); killRegionServer(serverNode); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index e103f2c3c75..5f0578ed53a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -338,7 +338,6 @@ public class RegionStates { private final ServerName serverName; private volatile ServerState state = ServerState.ONLINE; - private volatile int versionNumber = 0; public ServerStateNode(final ServerName serverName) { this.serverName = serverName; @@ -354,10 +353,6 @@ public class RegionStates { return state; } - public int getVersionNumber() { - return versionNumber; - } - public ProcedureEvent getReportEvent() { return reportEvent; } @@ -380,10 +375,6 @@ public class RegionStates { this.state = state; } - public void setVersionNumber(final int versionNumber) { - this.versionNumber = versionNumber; - } - public Set getRegions() { return regions; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 358fd61c70b..5693952e620 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -107,7 +107,7 @@ public class RSProcedureDispatcher @Override protected void remoteDispatch(final ServerName serverName, final Set remoteProcedures) { - final int rsVersion = master.getAssignmentManager().getServerVersion(serverName); + final int rsVersion = master.getServerManager().getServerVersion(serverName); if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName, rsVersion); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java index 3df4b3fbcae..c68f94dadf1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java @@ -66,7 +66,7 @@ public class TestClockSkewDetection { request.setPort(1234); request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis()); - sm.regionServerStartup(request.build(), ia1); + sm.regionServerStartup(request.build(), 0, ia1); final Configuration c = HBaseConfiguration.create(); long maxSkew = c.getLong("hbase.master.maxclockskew", 30000); @@ -81,7 +81,7 @@ public class TestClockSkewDetection { request.setPort(1235); request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2); - sm.regionServerStartup(request.build(), ia2); + sm.regionServerStartup(request.build(), 0, ia2); fail("HMaster should have thrown a ClockOutOfSyncException but didn't."); } catch(ClockOutOfSyncException e) { //we want an exception @@ -97,7 +97,7 @@ public class TestClockSkewDetection { request.setPort(1236); request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2); - sm.regionServerStartup(request.build(), ia3); + sm.regionServerStartup(request.build(), 0, ia3); fail("HMaster should have thrown a ClockOutOfSyncException but didn't."); } catch (ClockOutOfSyncException e) { // we want an exception @@ -111,7 +111,7 @@ public class TestClockSkewDetection { request.setPort(1237); request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2); - sm.regionServerStartup(request.build(), ia4); + sm.regionServerStartup(request.build(), 0, ia4); // make sure values above warning threshold but below max threshold don't kill LOG.debug("regionServerStartup 5"); @@ -120,9 +120,6 @@ public class TestClockSkewDetection { request.setPort(1238); request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2); - sm.regionServerStartup(request.build(), ia5); - + sm.regionServerStartup(request.build(), 0, ia5); } - -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index 088dff51dbe..8161a6343b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -211,4 +212,31 @@ public class TestRestartCluster { assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); } } + + @Test + public void testNewStartedRegionServerVersion() throws Exception { + UTIL.startMiniCluster(1); + + // Start 3 new region server + Thread t = new Thread(() -> { + for (int i = 0; i < 3; i++) { + try { + JVMClusterUtil.RegionServerThread newRS = UTIL.getMiniHBaseCluster().startRegionServer(); + newRS.waitForServerOnline(); + } catch (IOException e) { + LOG.error("Failed to start a new RS", e); + } + } + }); + t.start(); + + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + while (t.isAlive()) { + List serverNames = master.getServerManager().getOnlineServersList(); + for (ServerName serverName : serverNames) { + assertNotEquals(0, master.getServerManager().getServerVersion(serverName)); + } + Thread.sleep(100); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index fb75001cb36..2272bec8b59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -125,7 +125,7 @@ public class MockMasterServices extends MockNoopMasterServices { // Make a report with current state of the server 'serverName' before we call wait.. SortedSet regions = regionsToRegionServers.get(serverName); try { - getAssignmentManager().reportOnlineRegions(serverName, 0, + getAssignmentManager().reportOnlineRegions(serverName, regions == null? new HashSet(): regions); } catch (YouAreDeadException e) { throw new RuntimeException(e);