HBASE-20784 Will lose the SNAPSHOT suffix if we get the version of RS from ServerManager
This commit is contained in:
parent
dddf15ae6b
commit
5e25bc92cf
|
@ -40,6 +40,13 @@ public interface ServerMetrics {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the string type version of a regionserver.
|
||||||
|
*/
|
||||||
|
default String getVersion() {
|
||||||
|
return "0.0.0";
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the number of requests per second.
|
* @return the number of requests per second.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -48,41 +48,40 @@ public final class ServerMetricsBuilder {
|
||||||
return newBuilder(sn).build();
|
return newBuilder(sn).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ServerMetrics of(ServerName sn, int versionNumber) {
|
public static ServerMetrics of(ServerName sn, int versionNumber, String version) {
|
||||||
return newBuilder(sn).setVersionNumber(versionNumber).build();
|
return newBuilder(sn).setVersionNumber(versionNumber).setVersion(version).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) {
|
public static ServerMetrics toServerMetrics(ClusterStatusProtos.LiveServerInfo serverInfo) {
|
||||||
return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0,
|
return toServerMetrics(ProtobufUtil.toServerName(serverInfo.getServer()), 0, "0.0.0",
|
||||||
serverInfo.getServerLoad());
|
serverInfo.getServerLoad());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ServerMetrics toServerMetrics(ServerName serverName,
|
public static ServerMetrics toServerMetrics(ServerName serverName,
|
||||||
ClusterStatusProtos.ServerLoad serverLoadPB) {
|
ClusterStatusProtos.ServerLoad serverLoadPB) {
|
||||||
return toServerMetrics(serverName, 0, serverLoadPB);
|
return toServerMetrics(serverName, 0, "0.0.0", serverLoadPB);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber,
|
public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber,
|
||||||
ClusterStatusProtos.ServerLoad serverLoadPB) {
|
String version, ClusterStatusProtos.ServerLoad serverLoadPB) {
|
||||||
return ServerMetricsBuilder.newBuilder(serverName).setVersionNumber(versionNumber)
|
return ServerMetricsBuilder.newBuilder(serverName)
|
||||||
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
|
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
|
||||||
.setRequestCount(serverLoadPB.getTotalNumberOfRequests())
|
.setRequestCount(serverLoadPB.getTotalNumberOfRequests())
|
||||||
.setInfoServerPort(serverLoadPB.getInfoServerPort())
|
.setInfoServerPort(serverLoadPB.getInfoServerPort())
|
||||||
.setMaxHeapSize(new Size(serverLoadPB.getMaxHeapMB(), Size.Unit.MEGABYTE))
|
.setMaxHeapSize(new Size(serverLoadPB.getMaxHeapMB(), Size.Unit.MEGABYTE))
|
||||||
.setUsedHeapSize(new Size(serverLoadPB.getUsedHeapMB(), Size.Unit.MEGABYTE))
|
.setUsedHeapSize(new Size(serverLoadPB.getUsedHeapMB(), Size.Unit.MEGABYTE))
|
||||||
.setCoprocessorNames(serverLoadPB.getCoprocessorsList().stream()
|
.setCoprocessorNames(serverLoadPB.getCoprocessorsList().stream()
|
||||||
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
|
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
|
||||||
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
|
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
|
||||||
.map(RegionMetricsBuilder::toRegionMetrics)
|
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()))
|
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
|
||||||
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
|
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
|
||||||
.map(ProtobufUtil::toReplicationLoadSource)
|
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
|
||||||
.collect(Collectors.toList()))
|
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
|
||||||
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ?
|
: null)
|
||||||
ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) : null)
|
.setReportTimestamp(serverLoadPB.getReportEndTime())
|
||||||
.setReportTimestamp(serverLoadPB.getReportEndTime())
|
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
|
||||||
.setLastReportTimestamp(serverLoadPB.getReportStartTime())
|
.setVersion(version).build();
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<HBaseProtos.Coprocessor> toCoprocessor(Collection<String> names) {
|
public static List<HBaseProtos.Coprocessor> toCoprocessor(Collection<String> names) {
|
||||||
|
@ -120,6 +119,7 @@ public final class ServerMetricsBuilder {
|
||||||
|
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
private int versionNumber;
|
private int versionNumber;
|
||||||
|
private String version = "0.0.0";
|
||||||
private long requestCountPerSecond;
|
private long requestCountPerSecond;
|
||||||
private long requestCount;
|
private long requestCount;
|
||||||
private Size usedHeapSize = Size.ZERO;
|
private Size usedHeapSize = Size.ZERO;
|
||||||
|
@ -141,6 +141,11 @@ public final class ServerMetricsBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ServerMetricsBuilder setVersion(String version) {
|
||||||
|
this.version = version;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public ServerMetricsBuilder setRequestCountPerSecond(long value) {
|
public ServerMetricsBuilder setRequestCountPerSecond(long value) {
|
||||||
this.requestCountPerSecond = value;
|
this.requestCountPerSecond = value;
|
||||||
return this;
|
return this;
|
||||||
|
@ -200,6 +205,7 @@ public final class ServerMetricsBuilder {
|
||||||
return new ServerMetricsImpl(
|
return new ServerMetricsImpl(
|
||||||
serverName,
|
serverName,
|
||||||
versionNumber,
|
versionNumber,
|
||||||
|
version,
|
||||||
requestCountPerSecond,
|
requestCountPerSecond,
|
||||||
requestCount,
|
requestCount,
|
||||||
usedHeapSize,
|
usedHeapSize,
|
||||||
|
@ -216,6 +222,7 @@ public final class ServerMetricsBuilder {
|
||||||
private static class ServerMetricsImpl implements ServerMetrics {
|
private static class ServerMetricsImpl implements ServerMetrics {
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
private final int versionNumber;
|
private final int versionNumber;
|
||||||
|
private final String version;
|
||||||
private final long requestCountPerSecond;
|
private final long requestCountPerSecond;
|
||||||
private final long requestCount;
|
private final long requestCount;
|
||||||
private final Size usedHeapSize;
|
private final Size usedHeapSize;
|
||||||
|
@ -229,13 +236,14 @@ public final class ServerMetricsBuilder {
|
||||||
private final long reportTimestamp;
|
private final long reportTimestamp;
|
||||||
private final long lastReportTimestamp;
|
private final long lastReportTimestamp;
|
||||||
|
|
||||||
ServerMetricsImpl(ServerName serverName, int versionNumber, long requestCountPerSecond,
|
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
|
||||||
long requestCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
|
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
|
||||||
List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
|
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
|
||||||
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
|
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
|
||||||
long lastReportTimestamp) {
|
long lastReportTimestamp) {
|
||||||
this.serverName = Preconditions.checkNotNull(serverName);
|
this.serverName = Preconditions.checkNotNull(serverName);
|
||||||
this.versionNumber = versionNumber;
|
this.versionNumber = versionNumber;
|
||||||
|
this.version = version;
|
||||||
this.requestCountPerSecond = requestCountPerSecond;
|
this.requestCountPerSecond = requestCountPerSecond;
|
||||||
this.requestCount = requestCount;
|
this.requestCount = requestCount;
|
||||||
this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize);
|
this.usedHeapSize = Preconditions.checkNotNull(usedHeapSize);
|
||||||
|
@ -259,6 +267,10 @@ public final class ServerMetricsBuilder {
|
||||||
return versionNumber;
|
return versionNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getVersion() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getRequestCountPerSecond() {
|
public long getRequestCountPerSecond() {
|
||||||
return requestCountPerSecond;
|
return requestCountPerSecond;
|
||||||
|
|
|
@ -70,18 +70,10 @@ public final class VersionInfoUtil {
|
||||||
/**
|
/**
|
||||||
* @return the versionInfo extracted from the current RpcCallContext
|
* @return the versionInfo extracted from the current RpcCallContext
|
||||||
*/
|
*/
|
||||||
private static HBaseProtos.VersionInfo getCurrentClientVersionInfo() {
|
public static HBaseProtos.VersionInfo getCurrentClientVersionInfo() {
|
||||||
return RpcServer.getCurrentCall().map(RpcCallContext::getClientVersionInfo).orElse(null);
|
return RpcServer.getCurrentCall().map(RpcCallContext::getClientVersionInfo).orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the version number extracted from the current RpcCallContext as int.
|
|
||||||
* (e.g. 0x0103004 is 1.3.4)
|
|
||||||
*/
|
|
||||||
public static int getCurrentClientVersionNumber() {
|
|
||||||
return getVersionNumber(getCurrentClientVersionInfo());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param version
|
* @param version
|
||||||
|
|
|
@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableState;
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
||||||
|
@ -2693,11 +2692,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRegionServerVersion(final ServerName sn) {
|
public String getRegionServerVersion(ServerName sn) {
|
||||||
// Will return 0 if the server is not online to prevent move system region to unknown version
|
// Will return "0.0.0" if the server is not online to prevent move system region to unknown
|
||||||
// RS.
|
// version RS.
|
||||||
int versionNumber = this.serverManager.getServerVersion(sn);
|
return this.serverManager.getVersion(sn);
|
||||||
return VersionInfoUtil.versionNumberToString(versionNumber);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
|
||||||
|
@ -448,22 +449,29 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegionServerReportResponse regionServerReport(
|
public RegionServerReportResponse regionServerReport(RpcController controller,
|
||||||
RpcController controller, RegionServerReportRequest request) throws ServiceException {
|
RegionServerReportRequest request) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
master.checkServiceStarted();
|
master.checkServiceStarted();
|
||||||
int version = VersionInfoUtil.getCurrentClientVersionNumber();
|
int versionNumber = 0;
|
||||||
|
String version = "0.0.0";
|
||||||
|
VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
|
||||||
|
if (versionInfo != null) {
|
||||||
|
version = versionInfo.getVersion();
|
||||||
|
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
|
||||||
|
}
|
||||||
ClusterStatusProtos.ServerLoad sl = request.getLoad();
|
ClusterStatusProtos.ServerLoad sl = request.getLoad();
|
||||||
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
|
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
|
||||||
ServerMetrics oldLoad = master.getServerManager().getLoad(serverName);
|
ServerMetrics oldLoad = master.getServerManager().getLoad(serverName);
|
||||||
ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, version, sl);
|
ServerMetrics newLoad =
|
||||||
|
ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl);
|
||||||
master.getServerManager().regionServerReport(serverName, newLoad);
|
master.getServerManager().regionServerReport(serverName, newLoad);
|
||||||
master.getAssignmentManager()
|
master.getAssignmentManager().reportOnlineRegions(serverName,
|
||||||
.reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet());
|
newLoad.getRegionMetrics().keySet());
|
||||||
if (sl != null && master.metricsMaster != null) {
|
if (sl != null && master.metricsMaster != null) {
|
||||||
// Up our metrics.
|
// Up our metrics.
|
||||||
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
|
master.metricsMaster.incrementRequests(
|
||||||
- (oldLoad != null ? oldLoad.getRequestCount() : 0));
|
sl.getTotalNumberOfRequests() - (oldLoad != null ? oldLoad.getRequestCount() : 0));
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new ServiceException(ioe);
|
throw new ServiceException(ioe);
|
||||||
|
@ -472,23 +480,28 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegionServerStartupResponse regionServerStartup(
|
public RegionServerStartupResponse regionServerStartup(RpcController controller,
|
||||||
RpcController controller, RegionServerStartupRequest request) throws ServiceException {
|
RegionServerStartupRequest request) throws ServiceException {
|
||||||
// Register with server manager
|
// Register with server manager
|
||||||
try {
|
try {
|
||||||
master.checkServiceStarted();
|
master.checkServiceStarted();
|
||||||
int version = VersionInfoUtil.getCurrentClientVersionNumber();
|
int versionNumber = 0;
|
||||||
InetAddress ia = master.getRemoteInetAddress(
|
String version = "0.0.0";
|
||||||
request.getPort(), request.getServerStartCode());
|
VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
|
||||||
|
if (versionInfo != null) {
|
||||||
|
version = versionInfo.getVersion();
|
||||||
|
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
|
||||||
|
}
|
||||||
|
InetAddress ia = master.getRemoteInetAddress(request.getPort(), request.getServerStartCode());
|
||||||
// if regionserver passed hostname to use,
|
// if regionserver passed hostname to use,
|
||||||
// then use it instead of doing a reverse DNS lookup
|
// then use it instead of doing a reverse DNS lookup
|
||||||
ServerName rs = master.getServerManager().regionServerStartup(request, version, ia);
|
ServerName rs =
|
||||||
|
master.getServerManager().regionServerStartup(request, versionNumber, version, ia);
|
||||||
|
|
||||||
// Send back some config info
|
// Send back some config info
|
||||||
RegionServerStartupResponse.Builder resp = createConfigurationSubset();
|
RegionServerStartupResponse.Builder resp = createConfigurationSubset();
|
||||||
NameStringPair.Builder entry = NameStringPair.newBuilder()
|
NameStringPair.Builder entry = NameStringPair.newBuilder()
|
||||||
.setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
|
.setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER).setValue(rs.getHostname());
|
||||||
.setValue(rs.getHostname());
|
|
||||||
resp.addMapEntries(entry.build());
|
resp.addMapEntries(entry.build());
|
||||||
|
|
||||||
return resp.build();
|
return resp.build();
|
||||||
|
|
|
@ -127,10 +127,9 @@ public class RegionServerTracker extends ZKListener {
|
||||||
ServerName serverName = pair.getFirst();
|
ServerName serverName = pair.getFirst();
|
||||||
RegionServerInfo info = pair.getSecond();
|
RegionServerInfo info = pair.getSecond();
|
||||||
regionServers.add(serverName);
|
regionServers.add(serverName);
|
||||||
ServerMetrics serverMetrics = info != null
|
ServerMetrics serverMetrics = info != null ? ServerMetricsBuilder.of(serverName,
|
||||||
? ServerMetricsBuilder.of(serverName,
|
VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
|
||||||
VersionInfoUtil.getVersionNumber(info.getVersionInfo()))
|
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
|
||||||
: ServerMetricsBuilder.of(serverName);
|
|
||||||
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
||||||
}
|
}
|
||||||
serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
|
serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
|
||||||
|
|
|
@ -177,30 +177,30 @@ public class ServerManager {
|
||||||
/**
|
/**
|
||||||
* Let the server manager know a new regionserver has come online
|
* Let the server manager know a new regionserver has come online
|
||||||
* @param request the startup request
|
* @param request the startup request
|
||||||
* @param versionNumber the version of the new regionserver
|
* @param versionNumber the version number of the new regionserver
|
||||||
|
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
|
||||||
* @param ia the InetAddress from which request is received
|
* @param ia the InetAddress from which request is received
|
||||||
* @return The ServerName we know this server as.
|
* @return The ServerName we know this server as.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
|
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
|
||||||
InetAddress ia) throws IOException {
|
String version, InetAddress ia) throws IOException {
|
||||||
// Test for case where we get a region startup message from a regionserver
|
// Test for case where we get a region startup message from a regionserver
|
||||||
// that has been quickly restarted but whose znode expiration handler has
|
// that has been quickly restarted but whose znode expiration handler has
|
||||||
// not yet run, or from a server whose fail we are currently processing.
|
// not yet run, or from a server whose fail we are currently processing.
|
||||||
// Test its host+port combo is present in serverAddressToServerInfo. If it
|
// Test its host+port combo is present in serverAddressToServerInfo. If it
|
||||||
// is, reject the server and trigger its expiration. The next time it comes
|
// is, reject the server and trigger its expiration. The next time it comes
|
||||||
// in, it should have been removed from serverAddressToServerInfo and queued
|
// in, it should have been removed from serverAddressToServerInfo and queued
|
||||||
// for processing by ProcessServerShutdown.
|
// for processing by ProcessServerShutdown.
|
||||||
|
|
||||||
final String hostname = request.hasUseThisHostnameInstead() ?
|
final String hostname =
|
||||||
request.getUseThisHostnameInstead() :ia.getHostName();
|
request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
|
||||||
ServerName sn = ServerName.valueOf(hostname, request.getPort(),
|
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
|
||||||
request.getServerStartCode());
|
|
||||||
checkClockSkew(sn, request.getServerCurrentTime());
|
checkClockSkew(sn, request.getServerCurrentTime());
|
||||||
checkIsDead(sn, "STARTUP");
|
checkIsDead(sn, "STARTUP");
|
||||||
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber))) {
|
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
|
||||||
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
|
LOG.warn(
|
||||||
+ " could not record the server: " + sn);
|
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
|
||||||
}
|
}
|
||||||
return sn;
|
return sn;
|
||||||
}
|
}
|
||||||
|
@ -949,11 +949,19 @@ public class ServerManager {
|
||||||
/**
|
/**
|
||||||
* May return 0 when server is not online.
|
* May return 0 when server is not online.
|
||||||
*/
|
*/
|
||||||
public int getServerVersion(final ServerName serverName) {
|
public int getVersionNumber(ServerName serverName) {
|
||||||
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
||||||
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
|
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* May return "0.0.0" when server is not online
|
||||||
|
*/
|
||||||
|
public String getVersion(ServerName serverName) {
|
||||||
|
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
||||||
|
return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
|
||||||
|
}
|
||||||
|
|
||||||
public int getInfoPort(ServerName serverName) {
|
public int getInfoPort(ServerName serverName) {
|
||||||
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
||||||
return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
|
return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
|
||||||
|
|
|
@ -898,7 +898,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
|
master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
|
||||||
|
|
||||||
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
|
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
|
||||||
if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
|
if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
|
||||||
throw new UnsupportedOperationException(String.format(
|
throw new UnsupportedOperationException(String.format(
|
||||||
"Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
|
"Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
|
||||||
}
|
}
|
||||||
|
@ -921,7 +921,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
|
master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
|
||||||
|
|
||||||
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
|
// If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
|
||||||
if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
|
if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
|
||||||
throw new UnsupportedOperationException(String.format(
|
throw new UnsupportedOperationException(String.format(
|
||||||
"Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
|
"Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
|
||||||
hriB));
|
hriB));
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class RSProcedureDispatcher
|
||||||
@Override
|
@Override
|
||||||
protected void remoteDispatch(final ServerName serverName,
|
protected void remoteDispatch(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> remoteProcedures) {
|
final Set<RemoteProcedure> remoteProcedures) {
|
||||||
final int rsVersion = master.getServerManager().getServerVersion(serverName);
|
final int rsVersion = master.getServerManager().getVersionNumber(serverName);
|
||||||
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
|
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
|
||||||
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
|
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
|
||||||
rsVersion);
|
rsVersion);
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class TestClockSkewDetection {
|
||||||
request.setPort(1234);
|
request.setPort(1234);
|
||||||
request.setServerStartCode(-1);
|
request.setServerStartCode(-1);
|
||||||
request.setServerCurrentTime(System.currentTimeMillis());
|
request.setServerCurrentTime(System.currentTimeMillis());
|
||||||
sm.regionServerStartup(request.build(), 0, ia1);
|
sm.regionServerStartup(request.build(), 0, "0.0.0", ia1);
|
||||||
|
|
||||||
final Configuration c = HBaseConfiguration.create();
|
final Configuration c = HBaseConfiguration.create();
|
||||||
long maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
long maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
||||||
|
@ -81,7 +81,7 @@ public class TestClockSkewDetection {
|
||||||
request.setPort(1235);
|
request.setPort(1235);
|
||||||
request.setServerStartCode(-1);
|
request.setServerStartCode(-1);
|
||||||
request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2);
|
request.setServerCurrentTime(System.currentTimeMillis() - maxSkew * 2);
|
||||||
sm.regionServerStartup(request.build(), 0, ia2);
|
sm.regionServerStartup(request.build(), 0, "0.0.0", ia2);
|
||||||
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
|
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
|
||||||
} catch(ClockOutOfSyncException e) {
|
} catch(ClockOutOfSyncException e) {
|
||||||
//we want an exception
|
//we want an exception
|
||||||
|
@ -97,7 +97,7 @@ public class TestClockSkewDetection {
|
||||||
request.setPort(1236);
|
request.setPort(1236);
|
||||||
request.setServerStartCode(-1);
|
request.setServerStartCode(-1);
|
||||||
request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2);
|
request.setServerCurrentTime(System.currentTimeMillis() + maxSkew * 2);
|
||||||
sm.regionServerStartup(request.build(), 0, ia3);
|
sm.regionServerStartup(request.build(), 0, "0.0.0", ia3);
|
||||||
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
|
fail("HMaster should have thrown a ClockOutOfSyncException but didn't.");
|
||||||
} catch (ClockOutOfSyncException e) {
|
} catch (ClockOutOfSyncException e) {
|
||||||
// we want an exception
|
// we want an exception
|
||||||
|
@ -111,7 +111,7 @@ public class TestClockSkewDetection {
|
||||||
request.setPort(1237);
|
request.setPort(1237);
|
||||||
request.setServerStartCode(-1);
|
request.setServerStartCode(-1);
|
||||||
request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2);
|
request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2);
|
||||||
sm.regionServerStartup(request.build(), 0, ia4);
|
sm.regionServerStartup(request.build(), 0, "0.0.0", ia4);
|
||||||
|
|
||||||
// make sure values above warning threshold but below max threshold don't kill
|
// make sure values above warning threshold but below max threshold don't kill
|
||||||
LOG.debug("regionServerStartup 5");
|
LOG.debug("regionServerStartup 5");
|
||||||
|
@ -120,6 +120,6 @@ public class TestClockSkewDetection {
|
||||||
request.setPort(1238);
|
request.setPort(1238);
|
||||||
request.setServerStartCode(-1);
|
request.setServerStartCode(-1);
|
||||||
request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2);
|
request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2);
|
||||||
sm.regionServerStartup(request.build(), 0, ia5);
|
sm.regionServerStartup(request.build(), 0, "0.0.0", ia5);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -234,7 +234,7 @@ public class TestRestartCluster {
|
||||||
while (t.isAlive()) {
|
while (t.isAlive()) {
|
||||||
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
|
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
|
||||||
for (ServerName serverName : serverNames) {
|
for (ServerName serverName : serverNames) {
|
||||||
assertNotEquals(0, master.getServerManager().getServerVersion(serverName));
|
assertNotEquals(0, master.getServerManager().getVersionNumber(serverName));
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue