HBASE-20722 Make RegionServerTracker only depend on children changed event
This commit is contained in:
parent
ec66434380
commit
423a0ab71a
|
@ -102,7 +102,7 @@ public final class VersionInfoUtil {
|
||||||
* @param versionInfo the VersionInfo object to pack
|
* @param versionInfo the VersionInfo object to pack
|
||||||
* @return the version number as int. (e.g. 0x0103004 is 1.3.4)
|
* @return the version number as int. (e.g. 0x0103004 is 1.3.4)
|
||||||
*/
|
*/
|
||||||
private static int getVersionNumber(final HBaseProtos.VersionInfo versionInfo) {
|
public static int getVersionNumber(final HBaseProtos.VersionInfo versionInfo) {
|
||||||
if (versionInfo != null) {
|
if (versionInfo != null) {
|
||||||
try {
|
try {
|
||||||
final String[] components = getVersionComponents(versionInfo);
|
final String[] components = getVersionComponents(versionInfo);
|
||||||
|
|
|
@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableDescriptors;
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -87,6 +86,7 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableState;
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
|
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
||||||
|
@ -212,7 +212,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||||
|
@ -297,7 +296,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// Manager and zk listener for master election
|
// Manager and zk listener for master election
|
||||||
private final ActiveMasterManager activeMasterManager;
|
private final ActiveMasterManager activeMasterManager;
|
||||||
// Region server tracker
|
// Region server tracker
|
||||||
RegionServerTracker regionServerTracker;
|
private RegionServerTracker regionServerTracker;
|
||||||
// Draining region server tracker
|
// Draining region server tracker
|
||||||
private DrainingServerTracker drainingServerTracker;
|
private DrainingServerTracker drainingServerTracker;
|
||||||
// Tracker for load balancer state
|
// Tracker for load balancer state
|
||||||
|
@ -725,10 +724,16 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* <p>
|
||||||
* Initialize all ZK based system trackers.
|
* Initialize all ZK based system trackers.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Will be overridden in tests.
|
||||||
|
* </p>
|
||||||
*/
|
*/
|
||||||
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException,
|
@VisibleForTesting
|
||||||
ReplicationException {
|
protected void initializeZKBasedSystemTrackers()
|
||||||
|
throws IOException, InterruptedException, KeeperException, ReplicationException {
|
||||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||||
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
|
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
|
||||||
this.normalizer.setMasterServices(this);
|
this.normalizer.setMasterServices(this);
|
||||||
|
@ -1070,9 +1075,15 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* <p>
|
||||||
* Create a {@link ServerManager} instance.
|
* Create a {@link ServerManager} instance.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Will be overridden in tests.
|
||||||
|
* </p>
|
||||||
*/
|
*/
|
||||||
ServerManager createServerManager(final MasterServices master) throws IOException {
|
@VisibleForTesting
|
||||||
|
protected ServerManager createServerManager(final MasterServices master) throws IOException {
|
||||||
// We put this out here in a method so can do a Mockito.spy and stub it out
|
// We put this out here in a method so can do a Mockito.spy and stub it out
|
||||||
// w/ a mocked up ServerManager.
|
// w/ a mocked up ServerManager.
|
||||||
setupClusterConnection();
|
setupClusterConnection();
|
||||||
|
@ -1082,17 +1093,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
private void waitForRegionServers(final MonitoredTask status)
|
private void waitForRegionServers(final MonitoredTask status)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
this.serverManager.waitForRegionServers(status);
|
this.serverManager.waitForRegionServers(status);
|
||||||
// Check zk for region servers that are up but didn't register
|
|
||||||
for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
|
|
||||||
// The isServerOnline check is opportunistic, correctness is handled inside
|
|
||||||
if (!this.serverManager.isServerOnline(sn) &&
|
|
||||||
serverManager.checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn))) {
|
|
||||||
LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void initClusterSchemaService() throws IOException, InterruptedException {
|
// Will be overridden in tests
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void initClusterSchemaService() throws IOException, InterruptedException {
|
||||||
this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
|
this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
|
||||||
this.clusterSchemaService.startAsync();
|
this.clusterSchemaService.startAsync();
|
||||||
try {
|
try {
|
||||||
|
@ -1104,14 +1109,14 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void initQuotaManager() throws IOException {
|
private void initQuotaManager() throws IOException {
|
||||||
MasterQuotaManager quotaManager = new MasterQuotaManager(this);
|
MasterQuotaManager quotaManager = new MasterQuotaManager(this);
|
||||||
this.assignmentManager.setRegionStateListener(quotaManager);
|
this.assignmentManager.setRegionStateListener(quotaManager);
|
||||||
quotaManager.start();
|
quotaManager.start();
|
||||||
this.quotaManager = quotaManager;
|
this.quotaManager = quotaManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
|
private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
|
||||||
SpaceQuotaSnapshotNotifier notifier =
|
SpaceQuotaSnapshotNotifier notifier =
|
||||||
SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
|
SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
|
||||||
return notifier;
|
return notifier;
|
||||||
|
@ -1207,8 +1212,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
.getFileSystem(), archiveDir, params);
|
.getFileSystem(), archiveDir, params);
|
||||||
getChoreService().scheduleChore(hfileCleaner);
|
getChoreService().scheduleChore(hfileCleaner);
|
||||||
|
|
||||||
replicationBarrierCleaner =
|
replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
|
||||||
new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
|
replicationPeerManager);
|
||||||
getChoreService().scheduleChore(replicationBarrierCleaner);
|
getChoreService().scheduleChore(replicationBarrierCleaner);
|
||||||
|
|
||||||
serviceStarted = true;
|
serviceStarted = true;
|
||||||
|
@ -1263,6 +1268,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
if (this.mpmHost != null) {
|
if (this.mpmHost != null) {
|
||||||
this.mpmHost.stop("server shutting down.");
|
this.mpmHost.stop("server shutting down.");
|
||||||
}
|
}
|
||||||
|
if (this.regionServerTracker != null) {
|
||||||
|
this.regionServerTracker.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startProcedureExecutor() throws IOException {
|
private void startProcedureExecutor() throws IOException {
|
||||||
|
@ -2626,21 +2634,17 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getRegionServerInfoPort(final ServerName sn) {
|
public int getRegionServerInfoPort(final ServerName sn) {
|
||||||
RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
|
int port = this.serverManager.getInfoPort(sn);
|
||||||
if (info == null || info.getInfoPort() == 0) {
|
return port == 0 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
|
||||||
return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
|
HConstants.DEFAULT_REGIONSERVER_INFOPORT) : port;
|
||||||
HConstants.DEFAULT_REGIONSERVER_INFOPORT);
|
|
||||||
}
|
|
||||||
return info.getInfoPort();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRegionServerVersion(final ServerName sn) {
|
public String getRegionServerVersion(final ServerName sn) {
|
||||||
RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
|
// Will return 0 if the server is not online to prevent move system region to unknown version
|
||||||
if (info != null && info.hasVersionInfo()) {
|
// RS.
|
||||||
return info.getVersionInfo().getVersion();
|
int versionNumber = this.serverManager.getServerVersion(sn);
|
||||||
}
|
return VersionInfoUtil.versionNumberToString(versionNumber);
|
||||||
return "0.0.0"; //Lowest version to prevent move system region to unknown version RS.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -20,155 +19,167 @@ package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
|
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* <p>
|
||||||
* Tracks the online region servers via ZK.
|
* Tracks the online region servers via ZK.
|
||||||
*
|
* </p>
|
||||||
* <p>Handling of new RSs checking in is done via RPC. This class
|
* <p>
|
||||||
* is only responsible for watching for expired nodes. It handles
|
* Handling of new RSs checking in is done via RPC. This class is only responsible for watching for
|
||||||
* listening for changes in the RS node list and watching each node.
|
* expired nodes. It handles listening for changes in the RS node list. The only exception is when
|
||||||
*
|
* master restart, we will use the list fetched from zk to construct the initial set of live region
|
||||||
* <p>If an RS node gets deleted, this automatically handles calling of
|
* servers.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* If an RS node gets deleted, this automatically handles calling of
|
||||||
* {@link ServerManager#expireServer(ServerName)}
|
* {@link ServerManager#expireServer(ServerName)}
|
||||||
|
* </p>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RegionServerTracker extends ZKListener {
|
public class RegionServerTracker extends ZKListener {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
|
||||||
private final NavigableMap<ServerName, RegionServerInfo> regionServers = new TreeMap<>();
|
private final Set<ServerName> regionServers = new HashSet<>();
|
||||||
private ServerManager serverManager;
|
private final ServerManager serverManager;
|
||||||
private MasterServices server;
|
private final MasterServices server;
|
||||||
|
// As we need to send request to zk when processing the nodeChildrenChanged event, we'd better
|
||||||
|
// move the operation to a single threaded thread pool in order to not block the zk event
|
||||||
|
// processing since all the zk listener across HMaster will be called in one thread sequentially.
|
||||||
|
private final ExecutorService executor;
|
||||||
|
|
||||||
public RegionServerTracker(ZKWatcher watcher,
|
public RegionServerTracker(ZKWatcher watcher, MasterServices server,
|
||||||
MasterServices server, ServerManager serverManager) {
|
ServerManager serverManager) {
|
||||||
super(watcher);
|
super(watcher);
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.serverManager = serverManager;
|
this.serverManager = serverManager;
|
||||||
|
executor = Executors.newSingleThreadExecutor(
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
ServerName serverName = ServerName.parseServerName(name);
|
||||||
|
String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, name);
|
||||||
|
byte[] data;
|
||||||
|
try {
|
||||||
|
data = ZKUtil.getData(watcher, nodePath);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||||
|
}
|
||||||
|
if (data == null) {
|
||||||
|
// we should receive a children changed event later and then we will expire it, so we still
|
||||||
|
// need to add it to the region server set.
|
||||||
|
LOG.warn("Server node {} does not exist, already dead?", name);
|
||||||
|
return Pair.newPair(serverName, null);
|
||||||
|
}
|
||||||
|
if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) {
|
||||||
|
// this should not happen actually, unless we have bugs or someone has messed zk up.
|
||||||
|
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", name,
|
||||||
|
data.length);
|
||||||
|
return Pair.newPair(serverName, null);
|
||||||
|
}
|
||||||
|
RegionServerInfo.Builder builder = RegionServerInfo.newBuilder();
|
||||||
|
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||||
|
ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
|
||||||
|
return Pair.newPair(serverName, builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* <p>
|
||||||
* Starts the tracking of online RegionServers.
|
* Starts the tracking of online RegionServers.
|
||||||
*
|
* </p>
|
||||||
* <p>All RSs will be tracked after this method is called.
|
* <p>
|
||||||
*
|
* All RSs will be tracked after this method is called.
|
||||||
* @throws KeeperException
|
* </p>
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public void start() throws KeeperException, IOException {
|
public void start() throws KeeperException, IOException {
|
||||||
watcher.registerListener(this);
|
watcher.registerListener(this);
|
||||||
List<String> servers =
|
synchronized (this) {
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().rsZNode);
|
List<String> servers =
|
||||||
refresh(servers);
|
ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||||
}
|
for (String n : servers) {
|
||||||
|
Pair<ServerName, RegionServerInfo> pair = getServerInfo(n);
|
||||||
private void refresh(final List<String> servers) throws IOException {
|
ServerName serverName = pair.getFirst();
|
||||||
synchronized(this.regionServers) {
|
RegionServerInfo info = pair.getSecond();
|
||||||
this.regionServers.clear();
|
regionServers.add(serverName);
|
||||||
for (String n: servers) {
|
ServerMetrics serverMetrics = info != null
|
||||||
ServerName sn = ServerName.parseServerName(ZKUtil.getNodeName(n));
|
? ServerMetricsBuilder.of(serverName,
|
||||||
if (regionServers.get(sn) == null) {
|
VersionInfoUtil.getVersionNumber(info.getVersionInfo()))
|
||||||
RegionServerInfo.Builder rsInfoBuilder = RegionServerInfo.newBuilder();
|
: ServerMetricsBuilder.of(serverName);
|
||||||
try {
|
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
||||||
String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, n);
|
|
||||||
byte[] data = ZKUtil.getData(watcher, nodePath);
|
|
||||||
if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
|
|
||||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
|
||||||
ProtobufUtil.mergeFrom(rsInfoBuilder, data, magicLen, data.length - magicLen);
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Added tracking of RS " + nodePath);
|
|
||||||
}
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
LOG.warn("Get Rs info port from ephemeral node", e);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Illegal data from ephemeral node", e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new InterruptedIOException();
|
|
||||||
}
|
|
||||||
this.regionServers.put(sn, rsInfoBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void remove(final ServerName sn) {
|
public void stop() {
|
||||||
synchronized(this.regionServers) {
|
executor.shutdownNow();
|
||||||
this.regionServers.remove(sn);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private synchronized void refresh() {
|
||||||
public void nodeCreated(String path) {
|
List<String> names;
|
||||||
if (path.startsWith(watcher.getZNodePaths().rsZNode)) {
|
try {
|
||||||
String serverName = ZKUtil.getNodeName(path);
|
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||||
LOG.info("RegionServer ephemeral node created, adding [" + serverName + "]");
|
} catch (KeeperException e) {
|
||||||
if (server.isInitialized()) {
|
// here we need to abort as we failed to set watcher on the rs node which means that we can
|
||||||
// Only call the check to move servers if a RegionServer was added to the cluster; in this
|
// not track the node deleted evetnt any more.
|
||||||
// case it could be a server with a new version so it makes sense to run the check.
|
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||||
server.checkIfShouldMoveSystemRegionAsync();
|
return;
|
||||||
|
}
|
||||||
|
Set<ServerName> servers =
|
||||||
|
names.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
|
||||||
|
for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
|
||||||
|
ServerName sn = iter.next();
|
||||||
|
if (!servers.contains(sn)) {
|
||||||
|
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", sn);
|
||||||
|
serverManager.expireServer(sn);
|
||||||
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
// here we do not need to parse the region server info as it is useless now, we only need the
|
||||||
|
// server name.
|
||||||
@Override
|
boolean newServerAdded = false;
|
||||||
public void nodeDeleted(String path) {
|
for (ServerName sn : servers) {
|
||||||
if (path.startsWith(watcher.getZNodePaths().rsZNode)) {
|
if (regionServers.add(sn)) {
|
||||||
String serverName = ZKUtil.getNodeName(path);
|
newServerAdded = true;
|
||||||
LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
|
LOG.info("RegionServer ephemeral node created, adding [" + sn + "]");
|
||||||
serverName + "]");
|
|
||||||
ServerName sn = ServerName.parseServerName(serverName);
|
|
||||||
if (!serverManager.isServerOnline(sn)) {
|
|
||||||
LOG.warn(serverName.toString() + " is not online or isn't known to the master."+
|
|
||||||
"The latter could be caused by a DNS misconfiguration.");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
remove(sn);
|
}
|
||||||
this.serverManager.expireServer(sn);
|
if (newServerAdded && server.isInitialized()) {
|
||||||
|
// Only call the check to move servers if a RegionServer was added to the cluster; in this
|
||||||
|
// case it could be a server with a new version so it makes sense to run the check.
|
||||||
|
server.checkIfShouldMoveSystemRegionAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nodeChildrenChanged(String path) {
|
public void nodeChildrenChanged(String path) {
|
||||||
if (path.equals(watcher.getZNodePaths().rsZNode)
|
if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() &&
|
||||||
&& !server.isAborted() && !server.isStopped()) {
|
!server.isStopped()) {
|
||||||
try {
|
executor.execute(this::refresh);
|
||||||
List<String> servers =
|
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().rsZNode);
|
|
||||||
refresh(servers);
|
|
||||||
} catch (IOException e) {
|
|
||||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public RegionServerInfo getRegionServerInfo(final ServerName sn) {
|
|
||||||
return regionServers.get(sn);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the online servers.
|
|
||||||
* @return list of online servers
|
|
||||||
*/
|
|
||||||
public List<ServerName> getOnlineServers() {
|
|
||||||
synchronized (this.regionServers) {
|
|
||||||
return new ArrayList<>(this.regionServers.keySet());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
|
@ -187,19 +186,13 @@ public class ServerManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
* @param master
|
|
||||||
* @throws ZooKeeperConnectionException
|
|
||||||
*/
|
*/
|
||||||
public ServerManager(final MasterServices master) {
|
public ServerManager(final MasterServices master) {
|
||||||
this(master, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
ServerManager(final MasterServices master, final boolean connect) {
|
|
||||||
this.master = master;
|
this.master = master;
|
||||||
Configuration c = master.getConfiguration();
|
Configuration c = master.getConfiguration();
|
||||||
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
||||||
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
|
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
|
||||||
this.connection = connect? master.getClusterConnection(): null;
|
this.connection = master.getClusterConnection();
|
||||||
this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
|
this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,8 +221,7 @@ public class ServerManager {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
|
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
|
||||||
InetAddress ia)
|
InetAddress ia) throws IOException {
|
||||||
throws IOException {
|
|
||||||
// Test for case where we get a region startup message from a regionserver
|
// Test for case where we get a region startup message from a regionserver
|
||||||
// that has been quickly restarted but whose znode expiration handler has
|
// that has been quickly restarted but whose znode expiration handler has
|
||||||
// not yet run, or from a server whose fail we are currently processing.
|
// not yet run, or from a server whose fail we are currently processing.
|
||||||
|
@ -1068,4 +1060,9 @@ public class ServerManager {
|
||||||
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
||||||
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
|
return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getInfoPort(ServerName serverName) {
|
||||||
|
ServerMetrics serverMetrics = onlineServers.get(serverName);
|
||||||
|
return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,18 +20,11 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
@ -45,18 +38,13 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -74,16 +62,6 @@ public class TestAssignmentListener {
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
private static final Abortable abortable = new Abortable() {
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
}
|
|
||||||
};
|
|
||||||
static class DummyListener {
|
static class DummyListener {
|
||||||
protected AtomicInteger modified = new AtomicInteger(0);
|
protected AtomicInteger modified = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@ -284,7 +262,7 @@ public class TestAssignmentListener {
|
||||||
admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
|
admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
|
||||||
regions.get(1).getEncodedNameAsBytes(), true);
|
regions.get(1).getEncodedNameAsBytes(), true);
|
||||||
listener.awaitModifications(expectedModifications);
|
listener.awaitModifications(expectedModifications);
|
||||||
assertEquals(1, admin.getTableRegions(tableName).size());
|
assertEquals(1, admin.getRegions(tableName).size());
|
||||||
assertEquals(expectedLoadCount, listener.getLoadCount()); // new merged region added
|
assertEquals(expectedLoadCount, listener.getLoadCount()); // new merged region added
|
||||||
assertEquals(expectedCloseCount, listener.getCloseCount()); // daughters removed
|
assertEquals(expectedCloseCount, listener.getCloseCount()); // daughters removed
|
||||||
|
|
||||||
|
@ -313,78 +291,4 @@ public class TestAssignmentListener {
|
||||||
}
|
}
|
||||||
return serverCount == 1;
|
return serverCount == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAddNewServerThatExistsInDraining() throws Exception {
|
|
||||||
// Under certain circumstances, such as when we failover to the Backup
|
|
||||||
// HMaster, the DrainingServerTracker is started with existing servers in
|
|
||||||
// draining before all of the Region Servers register with the
|
|
||||||
// ServerManager as "online". This test is to ensure that Region Servers
|
|
||||||
// are properly added to the ServerManager.drainingServers when they
|
|
||||||
// register with the ServerManager under these circumstances.
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
|
||||||
ZKWatcher zooKeeper = new ZKWatcher(conf,
|
|
||||||
"zkWatcher-NewServerDrainTest", abortable, true);
|
|
||||||
String baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
|
|
||||||
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
|
||||||
String drainingZNode = ZNodePaths.joinZNode(baseZNode,
|
|
||||||
conf.get("zookeeper.znode.draining.rs", "draining"));
|
|
||||||
|
|
||||||
HMaster master = Mockito.mock(HMaster.class);
|
|
||||||
Mockito.when(master.getConfiguration()).thenReturn(conf);
|
|
||||||
|
|
||||||
ServerName SERVERNAME_A = ServerName.valueOf("mockserverbulk_a.org", 1000, 8000);
|
|
||||||
ServerName SERVERNAME_B = ServerName.valueOf("mockserverbulk_b.org", 1001, 8000);
|
|
||||||
ServerName SERVERNAME_C = ServerName.valueOf("mockserverbulk_c.org", 1002, 8000);
|
|
||||||
|
|
||||||
// We'll start with 2 servers in draining that existed before the
|
|
||||||
// HMaster started.
|
|
||||||
ArrayList<ServerName> drainingServers = new ArrayList<>();
|
|
||||||
drainingServers.add(SERVERNAME_A);
|
|
||||||
drainingServers.add(SERVERNAME_B);
|
|
||||||
|
|
||||||
// We'll have 2 servers that come online AFTER the DrainingServerTracker
|
|
||||||
// is started (just as we see when we failover to the Backup HMaster).
|
|
||||||
// One of these will already be a draining server.
|
|
||||||
HashMap<ServerName, ServerLoad> onlineServers = new HashMap<>();
|
|
||||||
onlineServers.put(SERVERNAME_A, new ServerLoad(ServerMetricsBuilder.of(SERVERNAME_A)));
|
|
||||||
onlineServers.put(SERVERNAME_C, new ServerLoad(ServerMetricsBuilder.of(SERVERNAME_C)));
|
|
||||||
|
|
||||||
// Create draining znodes for the draining servers, which would have been
|
|
||||||
// performed when the previous HMaster was running.
|
|
||||||
for (ServerName sn : drainingServers) {
|
|
||||||
String znode = ZNodePaths.joinZNode(drainingZNode, sn.getServerName());
|
|
||||||
ZKUtil.createAndFailSilent(zooKeeper, znode);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now, we follow the same order of steps that the HMaster does to setup
|
|
||||||
// the ServerManager, RegionServerTracker, and DrainingServerTracker.
|
|
||||||
ServerManager serverManager = new ServerManager(master);
|
|
||||||
|
|
||||||
RegionServerTracker regionServerTracker = new RegionServerTracker(
|
|
||||||
zooKeeper, master, serverManager);
|
|
||||||
regionServerTracker.start();
|
|
||||||
|
|
||||||
DrainingServerTracker drainingServerTracker = new DrainingServerTracker(
|
|
||||||
zooKeeper, master, serverManager);
|
|
||||||
drainingServerTracker.start();
|
|
||||||
|
|
||||||
// Confirm our ServerManager lists are empty.
|
|
||||||
Assert.assertEquals(new HashMap<ServerName, ServerLoad>(), serverManager.getOnlineServers());
|
|
||||||
Assert.assertEquals(new ArrayList<ServerName>(), serverManager.getDrainingServersList());
|
|
||||||
|
|
||||||
// checkAndRecordNewServer() is how servers are added to the ServerManager.
|
|
||||||
ArrayList<ServerName> onlineDrainingServers = new ArrayList<>();
|
|
||||||
for (ServerName sn : onlineServers.keySet()){
|
|
||||||
// Here's the actual test.
|
|
||||||
serverManager.checkAndRecordNewServer(sn, onlineServers.get(sn));
|
|
||||||
if (drainingServers.contains(sn)){
|
|
||||||
onlineDrainingServers.add(sn); // keeping track for later verification
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify the ServerManager lists are correctly updated.
|
|
||||||
Assert.assertEquals(onlineServers, serverManager.getOnlineServers());
|
|
||||||
Assert.assertEquals(onlineDrainingServers, serverManager.getDrainingServersList());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class TestClockSkewDetection {
|
||||||
when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class));
|
when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class));
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
}, true);
|
});
|
||||||
|
|
||||||
LOG.debug("regionServerStartup 1");
|
LOG.debug("regionServerStartup 1");
|
||||||
InetAddress ia1 = InetAddress.getLocalHost();
|
InetAddress ia1 = InetAddress.getLocalHost();
|
||||||
|
|
|
@ -23,8 +23,6 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -203,10 +201,10 @@ public class TestMasterNoCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initClusterSchemaService() throws IOException, InterruptedException {}
|
protected void initClusterSchemaService() throws IOException, InterruptedException {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
ServerManager createServerManager(MasterServices master) throws IOException {
|
protected ServerManager createServerManager(MasterServices master) throws IOException {
|
||||||
ServerManager sm = super.createServerManager(master);
|
ServerManager sm = super.createServerManager(master);
|
||||||
// Spy on the created servermanager
|
// Spy on the created servermanager
|
||||||
ServerManager spy = Mockito.spy(sm);
|
ServerManager spy = Mockito.spy(sm);
|
||||||
|
@ -271,23 +269,15 @@ public class TestMasterNoCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initClusterSchemaService() throws IOException, InterruptedException {}
|
protected void initClusterSchemaService() throws IOException, InterruptedException {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
|
protected void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
|
||||||
KeeperException, ReplicationException {
|
KeeperException, ReplicationException {
|
||||||
super.initializeZKBasedSystemTrackers();
|
super.initializeZKBasedSystemTrackers();
|
||||||
// Record a newer server in server manager at first
|
// Record a newer server in server manager at first
|
||||||
getServerManager().recordNewServerWithLock(newServer,
|
getServerManager().recordNewServerWithLock(newServer,
|
||||||
new ServerLoad(ServerMetricsBuilder.of(newServer)));
|
new ServerLoad(ServerMetricsBuilder.of(newServer)));
|
||||||
|
|
||||||
List<ServerName> onlineServers = new ArrayList<>();
|
|
||||||
onlineServers.add(deadServer);
|
|
||||||
onlineServers.add(newServer);
|
|
||||||
// Mock the region server tracker to pull the dead server from zk
|
|
||||||
regionServerTracker = Mockito.spy(regionServerTracker);
|
|
||||||
Mockito.doReturn(onlineServers).when(
|
|
||||||
regionServerTracker).getOnlineServers();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class TestShutdownBackupMaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initClusterSchemaService() throws IOException, InterruptedException {
|
protected void initClusterSchemaService() throws IOException, InterruptedException {
|
||||||
if (ARRIVE != null) {
|
if (ARRIVE != null) {
|
||||||
ARRIVE.countDown();
|
ARRIVE.countDown();
|
||||||
CONTINUE.await();
|
CONTINUE.await();
|
||||||
|
|
Loading…
Reference in New Issue