HBASE-20784 Will lose the SNAPSHOT suffix if we get the version of RS from ServerManager

This commit is contained in:
zhangduo 2018-07-06 15:10:22 +08:00
parent ad5b4af2c4
commit d7561cee50
11 changed files with 112 additions and 83 deletions

View File

@ -40,6 +40,13 @@ public interface ServerMetrics {
return 0;
}
/**
* @return the string type version of a regionserver.
*/
default String getVersion() {
return "0.0.0";
}
/**
* @return the number of requests per second.
*/

View File

@ -48,41 +48,40 @@ public final class ServerMetricsBuilder {
return newBuilder(sn).build();
}
public static ServerMetrics of(ServerName sn, int versionNumber) {
return newBuilder(sn).setVersionNumber(versionNumber).build();
public static ServerMetrics of(ServerName sn, int versionNumber, String version) {
return newBuilder(sn).setVersionNumber(versionNumber).setVersion(version).build();
}
public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) {
return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0,
serverInfo.getServerLoad());
return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0, "0.0.0",
serverInfo.getServerLoad());
}
public static ServerMetrics toServerMetrics(ServerName serverName,
ClusterStatusProtos.ServerLoad serverLoadPB) {
return toServerMetrics(serverName, 0, serverLoadPB);
return toServerMetrics(serverName, 0, "0.0.0", serverLoadPB);
}
public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber,
ClusterStatusProtos.ServerLoad serverLoadPB) {
return ServerMetricsBuilder.newBuilder(serverName).setVersionNumber(versionNumber)
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
.setRequestCount(serverLoadPB.getTotalNumberOfRequests())
.setInfoServerPort(serverLoadPB.getInfoServerPort())
.setMaxHeapSize(new Size(serverLoadPB.getMaxHeapMB(), Size.Unit.MEGABYTE))
.setUsedHeapSize(new Size(serverLoadPB.getUsedHeapMB(), Size.Unit.MEGABYTE))
.setCoprocessorNames(serverLoadPB.getCoprocessorsList().stream()
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics)
.collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource)
.collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ?
ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) : null)
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime())
.build();
String version, ClusterStatusProtos.ServerLoad serverLoadPB) {
return ServerMetricsBuilder.newBuilder(serverName)
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
.setRequestCount(serverLoadPB.getTotalNumberOfRequests())
.setInfoServerPort(serverLoadPB.getInfoServerPort())
.setMaxHeapSize(new Size(serverLoadPB.getMaxHeapMB(), Size.Unit.MEGABYTE))
.setUsedHeapSize(new Size(serverLoadPB.getUsedHeapMB(), Size.Unit.MEGABYTE))
.setCoprocessorNames(serverLoadPB.getCoprocessorsList().stream()
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
: null)
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
}
public static List<HBaseProtos.Coprocessor> toCoprocessor(Collection<String> names) {
@ -120,6 +119,7 @@ public final class ServerMetricsBuilder {
private final ServerName serverName;
private int versionNumber;
private String version = "0.0.0";
private long requestCountPerSecond;
private long requestCount;
private Size usedHeapSize = Size.ZERO;
@ -141,6 +141,11 @@ public final class ServerMetricsBuilder {
return this;
}
public ServerMetricsBuilder setVersion(String version) {
this.version = version;
return this;
}
public ServerMetricsBuilder setRequestCountPerSecond(long value) {
this.requestCountPerSecond = value;
return this;
@ -200,6 +205,7 @@ public final class ServerMetricsBuilder {
return new ServerMetricsImpl(
serverName,
versionNumber,
version,
requestCountPerSecond,
requestCount,
usedHeapSize,
@ -216,6 +222,7 @@ public final class ServerMetricsBuilder {
private static class ServerMetricsImpl implements ServerMetrics {
private final ServerName serverName;
private final int versionNumber;
private final String version;
private final long requestCountPerSecond;
private final long requestCount;
private final Size usedHeapSize;
@ -229,13 +236,14 @@ public final class ServerMetricsBuilder {
private final long reportTimestamp;
private final long lastReportTimestamp;
ServerMetricsImpl(ServerName serverName, int versionNumber, long requestCountPerSecond,
long requestCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
this.requestCountPerSecond = requestCountPerSecond;
this.requestCount = requestCount;
this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize);
@ -259,6 +267,10 @@ public final class ServerMetricsBuilder {
return versionNumber;
}
public String getVersion() {
return version;
}
@Override
public long getRequestCountPerSecond() {
return requestCountPerSecond;

View File

@ -70,18 +70,10 @@ public final class VersionInfoUtil {
/**
* @return the versionInfo extracted from the current RpcCallContext
*/
private static HBaseProtos.VersionInfo getCurrentClientVersionInfo() {
public static HBaseProtos.VersionInfo getCurrentClientVersionInfo() {
return RpcServer.getCurrentCall().map(RpcCallContext::getClientVersionInfo).orElse(null);
}
/**
* @return the version number extracted from the current RpcCallContext as int.
* (e.g. 0x0103004 is 1.3.4)
*/
public static int getCurrentClientVersionNumber() {
return getVersionNumber(getCurrentClientVersionInfo());
}
/**
* @param version

View File

@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
@ -2707,11 +2706,10 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public String getRegionServerVersion(final ServerName sn) {
// Will return 0 if the server is not online to prevent move system region to unknown version
// RS.
int versionNumber = this.serverManager.getServerVersion(sn);
return VersionInfoUtil.versionNumberToString(versionNumber);
public String getRegionServerVersion(ServerName sn) {
// Will return "0.0.0" if the server is not online to prevent move system region to unknown
// version RS.
return this.serverManager.getVersion(sn);
}
@Override

View File

@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
@ -452,22 +453,29 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
public RegionServerReportResponse regionServerReport(
RpcController controller, RegionServerReportRequest request) throws ServiceException {
public RegionServerReportResponse regionServerReport(RpcController controller,
RegionServerReportRequest request) throws ServiceException {
try {
master.checkServiceStarted();
int version = VersionInfoUtil.getCurrentClientVersionNumber();
int versionNumber = 0;
String version = "0.0.0";
VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
if (versionInfo != null) {
version = versionInfo.getVersion();
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
}
ClusterStatusProtos.ServerLoad sl = request.getLoad();
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
ServerMetrics oldLoad = master.getServerManager().getLoad(serverName);
ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, version, sl);
ServerMetrics newLoad =
ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl);
master.getServerManager().regionServerReport(serverName, newLoad);
master.getAssignmentManager()
.reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet());
master.getAssignmentManager().reportOnlineRegions(serverName,
newLoad.getRegionMetrics().keySet());
if (sl != null && master.metricsMaster != null) {
// Up our metrics.
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
- (oldLoad != null ? oldLoad.getRequestCount() : 0));
master.metricsMaster.incrementRequests(
sl.getTotalNumberOfRequests() - (oldLoad != null ? oldLoad.getRequestCount() : 0));
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
@ -476,23 +484,28 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
public RegionServerStartupResponse regionServerStartup(
RpcController controller, RegionServerStartupRequest request) throws ServiceException {
public RegionServerStartupResponse regionServerStartup(RpcController controller,
RegionServerStartupRequest request) throws ServiceException {
// Register with server manager
try {
master.checkServiceStarted();
int version = VersionInfoUtil.getCurrentClientVersionNumber();
InetAddress ia = master.getRemoteInetAddress(
request.getPort(), request.getServerStartCode());
int versionNumber = 0;
String version = "0.0.0";
VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
if (versionInfo != null) {
version = versionInfo.getVersion();
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
}
InetAddress ia = master.getRemoteInetAddress(request.getPort(), request.getServerStartCode());
// if regionserver passed hostname to use,
// then use it instead of doing a reverse DNS lookup
ServerName rs = master.getServerManager().regionServerStartup(request, version, ia);
ServerName rs =
master.getServerManager().regionServerStartup(request, versionNumber, version, ia);
// Send back some config info
RegionServerStartupResponse.Builder resp = createConfigurationSubset();
NameStringPair.Builder entry = NameStringPair.newBuilder()
.setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
.setValue(rs.getHostname());
.setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER).setValue(rs.getHostname());
resp.addMapEntries(entry.build());
return resp.build();

View File

@ -127,10 +127,9 @@ public class RegionServerTracker extends ZKListener {
ServerName serverName = pair.getFirst();
RegionServerInfo info = pair.getSecond();
regionServers.add(serverName);
ServerMetrics serverMetrics = info != null
? ServerMetricsBuilder.of(serverName,
VersionInfoUtil.getVersionNumber(info.getVersionInfo()))
: ServerMetricsBuilder.of(serverName);
ServerMetrics serverMetrics = info != null ? ServerMetricsBuilder.of(serverName,
VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);

View File

@ -214,30 +214,30 @@ public class ServerManager {
/**
* Let the server manager know a new regionserver has come online
* @param request the startup request
* @param versionNumber the version of the new regionserver
* @param versionNumber the version number of the new regionserver
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
* @param ia the InetAddress from which request is received
* @return The ServerName we know this server as.
* @throws IOException
*/
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
InetAddress ia) throws IOException {
String version, InetAddress ia) throws IOException {
// Test for case where we get a region startup message from a regionserver
// that has been quickly restarted but whose znode expiration handler has
// not yet run, or from a server whose fail we are currently processing.
// Test its host+port combo is present in serverAddressToServerInfo. If it
// Test its host+port combo is present in serverAddressToServerInfo. If it
// is, reject the server and trigger its expiration. The next time it comes
// in, it should have been removed from serverAddressToServerInfo and queued
// for processing by ProcessServerShutdown.
final String hostname = request.hasUseThisHostnameInstead() ?
request.getUseThisHostnameInstead() :ia.getHostName();
ServerName sn = ServerName.valueOf(hostname, request.getPort(),
request.getServerStartCode());
final String hostname =
request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
checkClockSkew(sn, request.getServerCurrentTime());
checkIsDead(sn, "STARTUP");
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber))) {
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
+ " could not record the server: " + sn);
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
}
return sn;
}
@ -1021,11 +1021,19 @@ public class ServerManager {
/**
* May return 0 when server is not online.
*/
public int getServerVersion(final ServerName serverName) {
public int getVersionNumber(ServerName serverName) {
ServerMetrics serverMetrics = onlineServers.get(serverName);
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
}
/**
* May return "0.0.0" when server is not online
*/
public String getVersion(ServerName serverName) {
ServerMetrics serverMetrics = onlineServers.get(serverName);
return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
}
public int getInfoPort(ServerName serverName) {
ServerMetrics serverMetrics = onlineServers.get(serverName);
return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;

View File

@ -898,7 +898,7 @@ public class AssignmentManager implements ServerListener {
master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
throw new UnsupportedOperationException(String.format(
"Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
}
@ -921,7 +921,7 @@ public class AssignmentManager implements ServerListener {
master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
throw new UnsupportedOperationException(String.format(
"Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
hriB));

View File

@ -108,7 +108,7 @@ public class RSProcedureDispatcher
@Override
protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
final int rsVersion = master.getServerManager().getServerVersion(serverName);
final int rsVersion = master.getServerManager().getVersionNumber(serverName);
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
rsVersion);

View File

@ -66,7 +66,7 @@ public class TestClockSkewDetection {
request.setPort(1234);
request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis());
sm.regionServerStartup(request.build(), 0, ia1);
sm.regionServerStartup(request.build(), 0, "0.0.0", ia1);
final Configuration c = HBaseConfiguration.create();
long maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
@ -81,7 +81,7 @@ public class TestClockSkewDetection {
request.setPort(1235);
request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2);
sm.regionServerStartup(request.build(), 0, ia2);
sm.regionServerStartup(request.build(), 0, "0.0.0", ia2);
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
} catch(ClockOutOfSyncException e) {
//we want an exception
@ -97,7 +97,7 @@ public class TestClockSkewDetection {
request.setPort(1236);
request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2);
sm.regionServerStartup(request.build(), 0, ia3);
sm.regionServerStartup(request.build(), 0, "0.0.0", ia3);
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
} catch (ClockOutOfSyncException e) {
// we want an exception
@ -111,7 +111,7 @@ public class TestClockSkewDetection {
request.setPort(1237);
request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2);
sm.regionServerStartup(request.build(), 0, ia4);
sm.regionServerStartup(request.build(), 0, "0.0.0", ia4);
// make sure values above warning threshold but below max threshold don't kill
LOG.debug("regionServerStartup 5");
@ -120,6 +120,6 @@ public class TestClockSkewDetection {
request.setPort(1238);
request.setServerStartCode(-1);
request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2);
sm.regionServerStartup(request.build(), 0, ia5);
sm.regionServerStartup(request.build(), 0, "0.0.0", ia5);
}
}

View File

@ -234,7 +234,7 @@ public class TestRestartCluster {
while (t.isAlive()) {
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
for (ServerName serverName : serverNames) {
assertNotEquals(0, master.getServerManager().getServerVersion(serverName));
assertNotEquals(0, master.getServerManager().getVersionNumber(serverName));
}
Thread.sleep(100);
}