HBASE-10210 during master startup, RS can be you-are-dead-ed by master in error

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1555275 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-01-03 23:08:44 +00:00
parent 886e8e4816
commit 2a14e4d0fa
4 changed files with 40 additions and 36 deletions

View File

@ -827,11 +827,10 @@ MasterServices, Server {
this.serverManager.waitForRegionServers(status); this.serverManager.waitForRegionServers(status);
// Check zk for region servers that are up but didn't register // Check zk for region servers that are up but didn't register
for (ServerName sn: this.regionServerTracker.getOnlineServers()) { for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
// The isServerOnline check is opportunistic, correctness is handled inside
if (!this.serverManager.isServerOnline(sn) if (!this.serverManager.isServerOnline(sn)
&& serverManager.checkAlreadySameHostPortAndRecordNewServer( && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
sn, ServerLoad.EMPTY_SERVERLOAD)) { LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
LOG.info("Registered server found up in zk but who has not yet "
+ "reported in: " + sn);
} }
} }

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
/** /**
@ -109,7 +110,7 @@ public class ServerManager {
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
/** Map of registered servers to their current load */ /** Map of registered servers to their current load */
private final Map<ServerName, ServerLoad> onlineServers = private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
new ConcurrentHashMap<ServerName, ServerLoad>(); new ConcurrentHashMap<ServerName, ServerLoad>();
/** /**
@ -214,8 +215,7 @@ public class ServerManager {
ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode); ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
checkClockSkew(sn, serverCurrentTime); checkClockSkew(sn, serverCurrentTime);
checkIsDead(sn, "STARTUP"); checkIsDead(sn, "STARTUP");
if (!checkAlreadySameHostPortAndRecordNewServer( if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
sn, ServerLoad.EMPTY_SERVERLOAD)) {
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup" LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
+ " could not record the server: " + sn); + " could not record the server: " + sn);
} }
@ -252,19 +252,17 @@ public class ServerManager {
void regionServerReport(ServerName sn, void regionServerReport(ServerName sn,
ServerLoad sl) throws YouAreDeadException { ServerLoad sl) throws YouAreDeadException {
checkIsDead(sn, "REPORT"); checkIsDead(sn, "REPORT");
if (!this.onlineServers.containsKey(sn)) { if (null == this.onlineServers.replace(sn, sl)) {
// Already have this host+port combo and its just different start code? // Already have this host+port combo and its just different start code?
// Just let the server in. Presume master joining a running cluster. // Just let the server in. Presume master joining a running cluster.
// recordNewServer is what happens at the end of reportServerStartup. // recordNewServer is what happens at the end of reportServerStartup.
// The only thing we are skipping is passing back to the regionserver // The only thing we are skipping is passing back to the regionserver
// the ServerName to use. Here we presume a master has already done // the ServerName to use. Here we presume a master has already done
// that so we'll press on with whatever it gave us for ServerName. // that so we'll press on with whatever it gave us for ServerName.
if (!checkAlreadySameHostPortAndRecordNewServer(sn, sl)) { if (!checkAndRecordNewServer(sn, sl)) {
LOG.info("RegionServerReport ignored, could not record the sever: " + sn); LOG.info("RegionServerReport ignored, could not record the server: " + sn);
return; // Not recorded, so no need to move on return; // Not recorded, so no need to move on
} }
} else {
this.onlineServers.put(sn, sl);
} }
updateLastFlushedSequenceIds(sn, sl); updateLastFlushedSequenceIds(sn, sl);
} }
@ -277,21 +275,25 @@ public class ServerManager {
* @param sl the server load on the server * @param sl the server load on the server
* @return true if the server is recorded, otherwise, false * @return true if the server is recorded, otherwise, false
*/ */
boolean checkAlreadySameHostPortAndRecordNewServer( boolean checkAndRecordNewServer(
final ServerName serverName, final ServerLoad sl) { final ServerName serverName, final ServerLoad sl) {
ServerName existingServer = findServerWithSameHostnamePort(serverName); ServerName existingServer = null;
if (existingServer != null) { synchronized (this.onlineServers) {
if (existingServer.getStartcode() > serverName.getStartcode()) { existingServer = findServerWithSameHostnamePortWithLock(serverName);
LOG.info("Server serverName=" + serverName + if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
" rejected; we already have " + existingServer.toString() + LOG.info("Server serverName=" + serverName + " rejected; we already have "
" registered with same hostname and port"); + existingServer.toString() + " registered with same hostname and port");
return false; return false;
} }
recordNewServerWithLock(serverName, sl);
}
// Note that we assume that same ts means same server, and don't expire in that case.
// TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
LOG.info("Triggering server recovery; existingServer " + LOG.info("Triggering server recovery; existingServer " +
existingServer + " looks stale, new server:" + serverName); existingServer + " looks stale, new server:" + serverName);
expireServer(existingServer); expireServer(existingServer);
} }
recordNewServer(serverName, sl);
return true; return true;
} }
@ -350,22 +352,25 @@ public class ServerManager {
} }
/** /**
* Assumes onlineServers is locked.
* @return ServerName with matching hostname and port. * @return ServerName with matching hostname and port.
*/ */
private ServerName findServerWithSameHostnamePort( private ServerName findServerWithSameHostnamePortWithLock(
final ServerName serverName) { final ServerName serverName) {
for (ServerName sn: getOnlineServersList()) { for (ServerName sn: this.onlineServers.keySet()) {
if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn; if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
} }
return null; return null;
} }
/** /**
* Adds the onlineServers list. * Adds the onlineServers list. onlineServers should be locked.
* @param serverName The remote servers name. * @param serverName The remote servers name.
* @param sl * @param sl
* @return Server load from the removed server, if any.
*/ */
void recordNewServer(final ServerName serverName, final ServerLoad sl) { @VisibleForTesting
void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
LOG.info("Registering server=" + serverName); LOG.info("Registering server=" + serverName);
this.onlineServers.put(serverName, sl); this.onlineServers.put(serverName, sl);
this.rsAdmins.remove(serverName); this.rsAdmins.remove(serverName);
@ -440,6 +445,7 @@ public class ServerManager {
if (System.currentTimeMillis() > (previousLogTime + 1000)) { if (System.currentTimeMillis() > (previousLogTime + 1000)) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
// It's ok here to not sync on onlineServers - merely logging
for (ServerName key : this.onlineServers.keySet()) { for (ServerName key : this.onlineServers.keySet()) {
if (sb.length() > 0) { if (sb.length() > 0) {
sb.append(", "); sb.append(", ");
@ -471,22 +477,21 @@ public class ServerManager {
this.queuedDeadServers.add(serverName); this.queuedDeadServers.add(serverName);
return; return;
} }
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Expiration of " + serverName +
" but server not online");
}
if (this.deadservers.isDeadServer(serverName)) { if (this.deadservers.isDeadServer(serverName)) {
// TODO: Can this happen? It shouldn't be online in this case? // TODO: Can this happen? It shouldn't be online in this case?
LOG.warn("Expiration of " + serverName + LOG.warn("Expiration of " + serverName +
" but server shutdown already in progress"); " but server shutdown already in progress");
return; return;
} }
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(serverName);
this.onlineServers.remove(serverName);
synchronized (onlineServers) { synchronized (onlineServers) {
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Expiration of " + serverName + " but server not online");
}
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(serverName);
this.onlineServers.remove(serverName);
onlineServers.notifyAll(); onlineServers.notifyAll();
} }
this.rsAdmins.remove(serverName); this.rsAdmins.remove(serverName);

View File

@ -160,7 +160,7 @@ public class TestAssignmentManagerOnCluster {
// Created faked dead server // Created faked dead server
deadServer = ServerName.valueOf(destServer.getHostname(), deadServer = ServerName.valueOf(destServer.getHostname(),
destServer.getPort(), destServer.getStartcode() - 100L); destServer.getPort(), destServer.getStartcode() - 100L);
master.serverManager.recordNewServer(deadServer, ServerLoad.EMPTY_SERVERLOAD); master.serverManager.recordNewServerWithLock(deadServer, ServerLoad.EMPTY_SERVERLOAD);
AssignmentManager am = master.getAssignmentManager(); AssignmentManager am = master.getAssignmentManager();
RegionPlan plan = new RegionPlan(hri, null, deadServer); RegionPlan plan = new RegionPlan(hri, null, deadServer);

View File

@ -372,7 +372,7 @@ public class TestMasterNoCluster {
InterruptedException, KeeperException { InterruptedException, KeeperException {
super.initializeZKBasedSystemTrackers(); super.initializeZKBasedSystemTrackers();
// Record a newer server in server manager at first // Record a newer server in server manager at first
serverManager.recordNewServer(newServer, ServerLoad.EMPTY_SERVERLOAD); serverManager.recordNewServerWithLock(newServer, ServerLoad.EMPTY_SERVERLOAD);
List<ServerName> onlineServers = new ArrayList<ServerName>(); List<ServerName> onlineServers = new ArrayList<ServerName>();
onlineServers.add(deadServer); onlineServers.add(deadServer);