diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7369cc34660..fb048c02f31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -827,11 +827,10 @@ MasterServices, Server { this.serverManager.waitForRegionServers(status); // Check zk for region servers that are up but didn't register for (ServerName sn: this.regionServerTracker.getOnlineServers()) { + // The isServerOnline check is opportunistic, correctness is handled inside if (!this.serverManager.isServerOnline(sn) - && serverManager.checkAlreadySameHostPortAndRecordNewServer( - sn, ServerLoad.EMPTY_SERVERLOAD)) { - LOG.info("Registered server found up in zk but who has not yet " - + "reported in: " + sn); + && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { + LOG.info("Registered server found up in zk but who has not yet reported in: " + sn); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 84995203d0d..72f7dc437f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Triple; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; /** @@ -109,7 +110,7 @@ public class ServerManager { new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); /** Map of registered servers to their current load */ - private final Map onlineServers = + private final ConcurrentHashMap onlineServers = new ConcurrentHashMap(); /** @@ -214,8 +215,7 @@ public class ServerManager { ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode); checkClockSkew(sn, serverCurrentTime); checkIsDead(sn, "STARTUP"); - if (!checkAlreadySameHostPortAndRecordNewServer( - sn, ServerLoad.EMPTY_SERVERLOAD)) { + if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn); } @@ -252,19 +252,17 @@ public class ServerManager { void regionServerReport(ServerName sn, ServerLoad sl) throws YouAreDeadException { checkIsDead(sn, "REPORT"); - if (!this.onlineServers.containsKey(sn)) { + if (null == this.onlineServers.replace(sn, sl)) { // Already have this host+port combo and its just different start code? // Just let the server in. Presume master joining a running cluster. // recordNewServer is what happens at the end of reportServerStartup. // The only thing we are skipping is passing back to the regionserver // the ServerName to use. Here we presume a master has already done // that so we'll press on with whatever it gave us for ServerName. - if (!checkAlreadySameHostPortAndRecordNewServer(sn, sl)) { - LOG.info("RegionServerReport ignored, could not record the sever: " + sn); + if (!checkAndRecordNewServer(sn, sl)) { + LOG.info("RegionServerReport ignored, could not record the server: " + sn); return; // Not recorded, so no need to move on } - } else { - this.onlineServers.put(sn, sl); } updateLastFlushedSequenceIds(sn, sl); } @@ -277,21 +275,25 @@ public class ServerManager { * @param sl the server load on the server * @return true if the server is recorded, otherwise, false */ - boolean checkAlreadySameHostPortAndRecordNewServer( + boolean checkAndRecordNewServer( final ServerName serverName, final ServerLoad sl) { - ServerName existingServer = findServerWithSameHostnamePort(serverName); - if (existingServer != null) { - if (existingServer.getStartcode() > serverName.getStartcode()) { - LOG.info("Server serverName=" + serverName + - " rejected; we already have " + existingServer.toString() + - " registered with same hostname and port"); + ServerName existingServer = null; + synchronized (this.onlineServers) { + existingServer = findServerWithSameHostnamePortWithLock(serverName); + if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) { + LOG.info("Server serverName=" + serverName + " rejected; we already have " + + existingServer.toString() + " registered with same hostname and port"); return false; } + recordNewServerWithLock(serverName, sl); + } + // Note that we assume that same ts means same server, and don't expire in that case. + // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky. + if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) { LOG.info("Triggering server recovery; existingServer " + - existingServer + " looks stale, new server:" + serverName); + existingServer + " looks stale, new server:" + serverName); expireServer(existingServer); } - recordNewServer(serverName, sl); return true; } @@ -350,22 +352,25 @@ public class ServerManager { } /** + * Assumes onlineServers is locked. * @return ServerName with matching hostname and port. */ - private ServerName findServerWithSameHostnamePort( + private ServerName findServerWithSameHostnamePortWithLock( final ServerName serverName) { - for (ServerName sn: getOnlineServersList()) { + for (ServerName sn: this.onlineServers.keySet()) { if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn; } return null; } /** - * Adds the onlineServers list. + * Adds the onlineServers list. onlineServers should be locked. * @param serverName The remote servers name. * @param sl + * @return Server load from the removed server, if any. */ - void recordNewServer(final ServerName serverName, final ServerLoad sl) { + @VisibleForTesting + void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) { LOG.info("Registering server=" + serverName); this.onlineServers.put(serverName, sl); this.rsAdmins.remove(serverName); @@ -440,6 +445,7 @@ public class ServerManager { if (System.currentTimeMillis() > (previousLogTime + 1000)) { StringBuilder sb = new StringBuilder(); + // It's ok here to not sync on onlineServers - merely logging for (ServerName key : this.onlineServers.keySet()) { if (sb.length() > 0) { sb.append(", "); @@ -471,22 +477,21 @@ public class ServerManager { this.queuedDeadServers.add(serverName); return; } - if (!this.onlineServers.containsKey(serverName)) { - LOG.warn("Expiration of " + serverName + - " but server not online"); - } if (this.deadservers.isDeadServer(serverName)) { // TODO: Can this happen? It shouldn't be online in this case? LOG.warn("Expiration of " + serverName + " but server shutdown already in progress"); return; } - // Remove the server from the known servers lists and update load info BUT - // add to deadservers first; do this so it'll show in dead servers list if - // not in online servers list. - this.deadservers.add(serverName); - this.onlineServers.remove(serverName); synchronized (onlineServers) { + if (!this.onlineServers.containsKey(serverName)) { + LOG.warn("Expiration of " + serverName + " but server not online"); + } + // Remove the server from the known servers lists and update load info BUT + // add to deadservers first; do this so it'll show in dead servers list if + // not in online servers list. + this.deadservers.add(serverName); + this.onlineServers.remove(serverName); onlineServers.notifyAll(); } this.rsAdmins.remove(serverName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 532bb21b4a2..86b5ad6c28f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -160,7 +160,7 @@ public class TestAssignmentManagerOnCluster { // Created faked dead server deadServer = ServerName.valueOf(destServer.getHostname(), destServer.getPort(), destServer.getStartcode() - 100L); - master.serverManager.recordNewServer(deadServer, ServerLoad.EMPTY_SERVERLOAD); + master.serverManager.recordNewServerWithLock(deadServer, ServerLoad.EMPTY_SERVERLOAD); AssignmentManager am = master.getAssignmentManager(); RegionPlan plan = new RegionPlan(hri, null, deadServer); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 927531353c4..50678dabacd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -372,7 +372,7 @@ public class TestMasterNoCluster { InterruptedException, KeeperException { super.initializeZKBasedSystemTrackers(); // Record a newer server in server manager at first - serverManager.recordNewServer(newServer, ServerLoad.EMPTY_SERVERLOAD); + serverManager.recordNewServerWithLock(newServer, ServerLoad.EMPTY_SERVERLOAD); List onlineServers = new ArrayList(); onlineServers.add(deadServer);