HBASE-18516 Removed dead code in ServerManager resulted mostly from AMv2 refactoring
* Call to methods sendRegionOpen(), isServerReachable(), removeRequeuedDeadServers(), getRequeuedDeadServers() got removed in HBASE-14614 * Call to method ServerManager.sendFavoredNodes() got removed in HBASE-17198
This commit is contained in:
parent
2b1b01fce3
commit
55a754e6cb
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -53,31 +52,18 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -157,7 +143,6 @@ public class ServerManager {
|
||||||
private final long maxSkew;
|
private final long maxSkew;
|
||||||
private final long warningSkew;
|
private final long warningSkew;
|
||||||
|
|
||||||
private final RetryCounterFactory pingRetryCounterFactory;
|
|
||||||
private final RpcControllerFactory rpcControllerFactory;
|
private final RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -203,21 +188,16 @@ public class ServerManager {
|
||||||
* @param master
|
* @param master
|
||||||
* @throws ZooKeeperConnectionException
|
* @throws ZooKeeperConnectionException
|
||||||
*/
|
*/
|
||||||
public ServerManager(final MasterServices master) throws IOException {
|
public ServerManager(final MasterServices master) {
|
||||||
this(master, true);
|
this(master, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerManager(final MasterServices master, final boolean connect) throws IOException {
|
ServerManager(final MasterServices master, final boolean connect) {
|
||||||
this.master = master;
|
this.master = master;
|
||||||
Configuration c = master.getConfiguration();
|
Configuration c = master.getConfiguration();
|
||||||
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
||||||
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
|
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
|
||||||
this.connection = connect ? master.getClusterConnection() : null;
|
this.connection = connect ? master.getClusterConnection() : null;
|
||||||
int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
|
|
||||||
"hbase.master.maximum.ping.server.attempts", 10));
|
|
||||||
int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
|
|
||||||
"hbase.master.ping.server.retry.sleep.interval", 100));
|
|
||||||
this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
|
|
||||||
this.rpcControllerFactory = this.connection == null
|
this.rpcControllerFactory = this.connection == null
|
||||||
? null
|
? null
|
||||||
: connection.getRpcControllerFactory();
|
: connection.getRpcControllerFactory();
|
||||||
|
@ -251,7 +231,7 @@ public class ServerManager {
|
||||||
// Test for case where we get a region startup message from a regionserver
|
// Test for case where we get a region startup message from a regionserver
|
||||||
// that has been quickly restarted but whose znode expiration handler has
|
// that has been quickly restarted but whose znode expiration handler has
|
||||||
// not yet run, or from a server whose fail we are currently processing.
|
// not yet run, or from a server whose fail we are currently processing.
|
||||||
// Test its host+port combo is present in serverAddresstoServerInfo. If it
|
// Test its host+port combo is present in serverAddressToServerInfo. If it
|
||||||
// is, reject the server and trigger its expiration. The next time it comes
|
// is, reject the server and trigger its expiration. The next time it comes
|
||||||
// in, it should have been removed from serverAddressToServerInfo and queued
|
// in, it should have been removed from serverAddressToServerInfo and queued
|
||||||
// for processing by ProcessServerShutdown.
|
// for processing by ProcessServerShutdown.
|
||||||
|
@ -443,7 +423,7 @@ public class ServerManager {
|
||||||
/**
|
/**
|
||||||
* Adds the onlineServers list. onlineServers should be locked.
|
* Adds the onlineServers list. onlineServers should be locked.
|
||||||
* @param serverName The remote servers name.
|
* @param serverName The remote servers name.
|
||||||
* @param s
|
* @param sl
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
|
void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
|
||||||
|
@ -722,117 +702,6 @@ public class ServerManager {
|
||||||
|
|
||||||
// RPC methods to region servers
|
// RPC methods to region servers
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends an OPEN RPC to the specified server to open the specified region.
|
|
||||||
* <p>
|
|
||||||
* Open should not fail but can if server just crashed.
|
|
||||||
* <p>
|
|
||||||
* @param server server to open a region
|
|
||||||
* @param region region to open
|
|
||||||
* @param favoredNodes
|
|
||||||
*/
|
|
||||||
public RegionOpeningState sendRegionOpen(final ServerName server,
|
|
||||||
HRegionInfo region, List<ServerName> favoredNodes)
|
|
||||||
throws IOException {
|
|
||||||
AdminService.BlockingInterface admin = getRsAdmin(server);
|
|
||||||
if (admin == null) {
|
|
||||||
throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
|
|
||||||
" failed because no RPC connection found to this server");
|
|
||||||
}
|
|
||||||
OpenRegionRequest request =
|
|
||||||
RequestConverter.buildOpenRegionRequest(server, region, favoredNodes, false);
|
|
||||||
try {
|
|
||||||
OpenRegionResponse response = admin.openRegion(null, request);
|
|
||||||
return ResponseConverter.getRegionOpeningState(response);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
checkForRSznode(server, se);
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check for an odd state, where we think an RS is up but it is not. Do it on OPEN.
|
|
||||||
* This is only case where the check makes sense.
|
|
||||||
*
|
|
||||||
* <p>We are checking for instance of HBASE-9593 where a RS registered but died before it put
|
|
||||||
* up its znode in zk. In this case, the RS made it into the list of online servers but it
|
|
||||||
* is not actually UP. We do the check here where there is an evident problem rather
|
|
||||||
* than do some crazy footwork where we'd have master check zk after a RS had reported
|
|
||||||
* for duty with provisional state followed by a confirmed state; that'd be a mess.
|
|
||||||
* Real fix is HBASE-17733.
|
|
||||||
*/
|
|
||||||
private void checkForRSznode(final ServerName serverName, final ServiceException se) {
|
|
||||||
if (se.getCause() == null) return;
|
|
||||||
Throwable t = se.getCause();
|
|
||||||
if (t instanceof ConnectException) {
|
|
||||||
// If this, proceed to do cleanup.
|
|
||||||
} else {
|
|
||||||
// Look for FailedServerException
|
|
||||||
if (!(t instanceof IOException)) return;
|
|
||||||
if (t.getCause() == null) return;
|
|
||||||
if (!(t.getCause() instanceof FailedServerException)) return;
|
|
||||||
// Ok, found FailedServerException -- continue.
|
|
||||||
}
|
|
||||||
if (!isServerOnline(serverName)) return;
|
|
||||||
// We think this server is online. Check it has a znode up. Currently, a RS
|
|
||||||
// registers an ephereral znode in zk. If not present, something is up. Maybe
|
|
||||||
// HBASE-9593 where RS crashed AFTER reportForDuty but BEFORE it put up an ephemeral
|
|
||||||
// znode.
|
|
||||||
List<String> servers = null;
|
|
||||||
try {
|
|
||||||
servers = getRegionServersInZK(this.master.getZooKeeper());
|
|
||||||
} catch (KeeperException ke) {
|
|
||||||
LOG.warn("Failed to list regionservers", ke);
|
|
||||||
// ZK is malfunctioning, don't hang here
|
|
||||||
}
|
|
||||||
boolean found = false;
|
|
||||||
if (servers != null) {
|
|
||||||
for (String serverNameAsStr: servers) {
|
|
||||||
ServerName sn = ServerName.valueOf(serverNameAsStr);
|
|
||||||
if (sn.equals(serverName)) {
|
|
||||||
// Found a server up in zk.
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
LOG.warn("Online server " + serverName.toString() + " has no corresponding " +
|
|
||||||
"ephemeral znode (Did it die before registering in zk?); " +
|
|
||||||
"calling expire to clean it up!");
|
|
||||||
expireServer(serverName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends an OPEN RPC to the specified server to open the specified region.
|
|
||||||
* <p>
|
|
||||||
* Open should not fail but can if server just crashed.
|
|
||||||
* <p>
|
|
||||||
* @param server server to open a region
|
|
||||||
* @param regionOpenInfos info of a list of regions to open
|
|
||||||
* @return a list of region opening states
|
|
||||||
*/
|
|
||||||
public List<RegionOpeningState> sendRegionOpen(ServerName server,
|
|
||||||
List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos)
|
|
||||||
throws IOException {
|
|
||||||
AdminService.BlockingInterface admin = getRsAdmin(server);
|
|
||||||
if (admin == null) {
|
|
||||||
throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
|
|
||||||
" failed because no RPC connection found to this server");
|
|
||||||
}
|
|
||||||
|
|
||||||
OpenRegionRequest request =
|
|
||||||
RequestConverter.buildOpenRegionRequest(server, regionOpenInfos, false);
|
|
||||||
try {
|
|
||||||
OpenRegionResponse response = admin.openRegion(null, request);
|
|
||||||
return ResponseConverter.getRegionOpeningStateList(response);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
checkForRSznode(server, se);
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private HBaseRpcController newRpcController() {
|
private HBaseRpcController newRpcController() {
|
||||||
return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
|
return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
|
||||||
}
|
}
|
||||||
|
@ -892,41 +761,11 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a region server is reachable and has the expected start code
|
* @param sn
|
||||||
|
* @return Admin interface for the remote regionserver named <code>sn</code>
|
||||||
|
* @throws IOException
|
||||||
|
* @throws RetriesExhaustedException wrapping a ConnectException if failed
|
||||||
*/
|
*/
|
||||||
public boolean isServerReachable(ServerName server) {
|
|
||||||
if (server == null) throw new NullPointerException("Passed server is null");
|
|
||||||
|
|
||||||
|
|
||||||
RetryCounter retryCounter = pingRetryCounterFactory.create();
|
|
||||||
while (retryCounter.shouldRetry()) {
|
|
||||||
try {
|
|
||||||
HBaseRpcController controller = newRpcController();
|
|
||||||
AdminService.BlockingInterface admin = getRsAdmin(server);
|
|
||||||
if (admin != null) {
|
|
||||||
ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
|
|
||||||
return info != null && info.hasServerName()
|
|
||||||
&& server.getStartcode() == info.getServerName().getStartCode();
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes()
|
|
||||||
+ " of " + retryCounter.getMaxAttempts(), ioe);
|
|
||||||
try {
|
|
||||||
retryCounter.sleepUntilNextRetry();
|
|
||||||
} catch(InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param sn
|
|
||||||
* @return Admin interface for the remote regionserver named <code>sn</code>
|
|
||||||
* @throws IOException
|
|
||||||
* @throws RetriesExhaustedException wrapping a ConnectException if failed
|
|
||||||
*/
|
|
||||||
public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
|
public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
|
AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
|
||||||
|
@ -1088,23 +927,6 @@ public class ServerManager {
|
||||||
return new HashSet<>(this.queuedDeadServers);
|
return new HashSet<>(this.queuedDeadServers);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* During startup, if we figure it is not a failover, i.e. there is
|
|
||||||
* no more WAL files to split, we won't try to recover these dead servers.
|
|
||||||
* So we just remove them from the queue. Use caution in calling this.
|
|
||||||
*/
|
|
||||||
void removeRequeuedDeadServers() {
|
|
||||||
requeuedDeadServers.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return A copy of the internal map of requeuedDeadServers servers and their corresponding
|
|
||||||
* splitlog need flag.
|
|
||||||
*/
|
|
||||||
Map<ServerName, Boolean> getRequeuedDeadServers() {
|
|
||||||
return Collections.unmodifiableMap(this.requeuedDeadServers);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isServerOnline(ServerName serverName) {
|
public boolean isServerOnline(ServerName serverName) {
|
||||||
return serverName != null && onlineServers.containsKey(serverName);
|
return serverName != null && onlineServers.containsKey(serverName);
|
||||||
}
|
}
|
||||||
|
@ -1157,11 +979,7 @@ public class ServerManager {
|
||||||
|
|
||||||
// Loop through the draining server list and remove them from the server list
|
// Loop through the draining server list and remove them from the server list
|
||||||
final List<ServerName> drainingServersCopy = getDrainingServersList();
|
final List<ServerName> drainingServersCopy = getDrainingServersList();
|
||||||
if (!drainingServersCopy.isEmpty()) {
|
destServers.removeAll(drainingServersCopy);
|
||||||
for (final ServerName server: drainingServersCopy) {
|
|
||||||
destServers.remove(server);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the deadNotExpired servers from the server list.
|
// Remove the deadNotExpired servers from the server list.
|
||||||
removeDeadNotExpiredServers(destServers);
|
removeDeadNotExpiredServers(destServers);
|
||||||
|
@ -1218,25 +1036,4 @@ public class ServerManager {
|
||||||
removeRegion(hri);
|
removeRegion(hri);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendFavoredNodes(final ServerName server,
|
|
||||||
Map<HRegionInfo, List<ServerName>> favoredNodes) throws IOException {
|
|
||||||
AdminService.BlockingInterface admin = getRsAdmin(server);
|
|
||||||
if (admin == null) {
|
|
||||||
LOG.warn("Attempting to send favored nodes update rpc to server " + server.toString()
|
|
||||||
+ " failed because no RPC connection found to this server");
|
|
||||||
} else {
|
|
||||||
List<Pair<HRegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
|
|
||||||
for (Entry<HRegionInfo, List<ServerName>> entry : favoredNodes.entrySet()) {
|
|
||||||
regionUpdateInfos.add(new Pair<>(entry.getKey(), entry.getValue()));
|
|
||||||
}
|
|
||||||
UpdateFavoredNodesRequest request =
|
|
||||||
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
|
|
||||||
try {
|
|
||||||
admin.updateFavoredNodes(null, request);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue