HBASE-21266 Not running balancer because processing dead regionservers, but empty dead rs list

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Andrew Purtell 2018-10-11 15:28:36 -07:00
parent ebad3ab8ed
commit ea90846268
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
4 changed files with 90 additions and 38 deletions

View File

@ -18,12 +18,7 @@
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import org.apache.commons.logging.Log; import com.google.common.base.Preconditions;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -36,6 +31,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Class to hold dead servers list and utility querying dead server list. * Class to hold dead servers list and utility querying dead server list.
* On znode expiration, servers are added here. * On znode expiration, servers are added here.
@ -54,14 +56,9 @@ public class DeadServer {
private final Map<ServerName, Long> deadServers = new HashMap<ServerName, Long>(); private final Map<ServerName, Long> deadServers = new HashMap<ServerName, Long>();
/** /**
* Number of dead servers currently being processed * Set of dead servers currently being processed
*/ */
private int numProcessing = 0; private final Set<ServerName> processingServers = new HashSet<ServerName>();
/**
* Whether a dead server is being processed currently.
*/
private boolean processing = false;
/** /**
* A dead server that comes back alive has a different start code. The new start code should be * A dead server that comes back alive has a different start code. The new start code should be
@ -76,7 +73,13 @@ public class DeadServer {
while (it.hasNext()) { while (it.hasNext()) {
ServerName sn = it.next(); ServerName sn = it.next();
if (ServerName.isSameHostnameAndPort(sn, newServerName)) { if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
// remove from deadServers
it.remove(); it.remove();
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
}
return true; return true;
} }
} }
@ -92,6 +95,14 @@ public class DeadServer {
return deadServers.containsKey(serverName); return deadServers.containsKey(serverName);
} }
/**
* @param serverName server name.
* @return true if this server is on the processing servers list false otherwise
*/
public synchronized boolean isProcessingServer(final ServerName serverName) {
return processingServers.contains(serverName);
}
/** /**
* Checks if there are currently any dead servers being processed by the * Checks if there are currently any dead servers being processed by the
* master. Returns true if at least one region server is currently being * master. Returns true if at least one region server is currently being
@ -99,7 +110,9 @@ public class DeadServer {
* *
* @return true if any RS are being processed as dead * @return true if any RS are being processed as dead
*/ */
public synchronized boolean areDeadServersInProgress() { return processing; } public synchronized boolean areDeadServersInProgress() {
return !processingServers.isEmpty();
}
public synchronized Set<ServerName> copyServerNames() { public synchronized Set<ServerName> copyServerNames() {
Set<ServerName> clone = new HashSet<ServerName>(deadServers.size()); Set<ServerName> clone = new HashSet<ServerName>(deadServers.size());
@ -112,10 +125,13 @@ public class DeadServer {
* @param sn the server name * @param sn the server name
*/ */
public synchronized void add(ServerName sn) { public synchronized void add(ServerName sn) {
processing = true;
if (!deadServers.containsKey(sn)){ if (!deadServers.containsKey(sn)){
deadServers.put(sn, EnvironmentEdgeManager.currentTime()); deadServers.put(sn, EnvironmentEdgeManager.currentTime());
} }
boolean added = processingServers.add(sn);
if (LOG.isDebugEnabled() && added) {
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
}
} }
/** /**
@ -123,18 +139,27 @@ public class DeadServer {
* @param sn ServerName for the dead server. * @param sn ServerName for the dead server.
*/ */
public synchronized void notifyServer(ServerName sn) { public synchronized void notifyServer(ServerName sn) {
if (LOG.isDebugEnabled()) { LOG.debug("Started processing " + sn); } boolean added = processingServers.add(sn);
processing = true; if (LOG.isDebugEnabled()) {
numProcessing++; if (added) {
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
}
LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
}
} }
/**
* Complete processing for this dead server.
* @param sn ServerName for the dead server.
*/
public synchronized void finish(ServerName sn) { public synchronized void finish(ServerName sn) {
numProcessing--; boolean removed = processingServers.remove(sn);
if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + numProcessing); if (LOG.isDebugEnabled()) {
LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative"; if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
if (numProcessing == 0) { processing = false; } }
}
} }
public synchronized int size() { public synchronized int size() {
@ -150,20 +175,37 @@ public class DeadServer {
while (it.hasNext()) { while (it.hasNext()) {
ServerName sn = it.next(); ServerName sn = it.next();
if (ServerName.isSameHostnameAndPort(sn, newServerName)) { if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
// remove from deadServers
it.remove(); it.remove();
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
}
} }
} }
} }
@Override @Override
public synchronized String toString() { public synchronized String toString() {
// Display unified set of servers from both maps
Set<ServerName> servers = new HashSet<ServerName>();
servers.addAll(deadServers.keySet());
servers.addAll(processingServers);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (ServerName sn : deadServers.keySet()) { for (ServerName sn : servers) {
if (sb.length() > 0) { if (sb.length() > 0) {
sb.append(", "); sb.append(", ");
} }
sb.append(sn.toString()); sb.append(sn.toString());
// Star entries that are being processed
if (processingServers.contains(sn)) {
sb.append("*");
}
} }
sb.append(" (numProcessing=");
sb.append(processingServers.size());
sb.append(')');
return sb.toString(); return sb.toString();
} }
@ -210,6 +252,9 @@ public class DeadServer {
* @return true if this server was removed * @return true if this server was removed
*/ */
public synchronized boolean removeDeadServer(final ServerName deadServerName) { public synchronized boolean removeDeadServer(final ServerName deadServerName) {
Preconditions.checkState(!processingServers.contains(deadServerName),
"Asked to remove server still in processingServers set " + deadServerName +
" (numProcessing=" + processingServers.size() + ")");
if (deadServers.remove(deadServerName) == null) { if (deadServers.remove(deadServerName) == null) {
return false; return false;
} }

View File

@ -116,7 +116,6 @@ public class TestDeadServer {
DeadServer d = new DeadServer(); DeadServer d = new DeadServer();
d.add(hostname123); d.add(hostname123);
mee.incValue(1); mee.incValue(1);
d.add(hostname1234); d.add(hostname1234);
@ -157,14 +156,17 @@ public class TestDeadServer {
d.add(hostname1234); d.add(hostname1234);
Assert.assertEquals(2, d.size()); Assert.assertEquals(2, d.size());
d.finish(hostname123);
d.removeDeadServer(hostname123); d.removeDeadServer(hostname123);
Assert.assertEquals(1, d.size()); Assert.assertEquals(1, d.size());
d.finish(hostname1234);
d.removeDeadServer(hostname1234); d.removeDeadServer(hostname1234);
Assert.assertTrue(d.isEmpty()); Assert.assertTrue(d.isEmpty());
d.add(hostname1234); d.add(hostname1234);
Assert.assertFalse(d.removeDeadServer(hostname123_2)); Assert.assertFalse(d.removeDeadServer(hostname123_2));
Assert.assertEquals(1, d.size()); Assert.assertEquals(1, d.size());
d.finish(hostname1234);
Assert.assertTrue(d.removeDeadServer(hostname1234)); Assert.assertTrue(d.removeDeadServer(hostname1234));
Assert.assertTrue(d.isEmpty()); Assert.assertTrue(d.isEmpty());
} }

View File

@ -459,7 +459,7 @@ public class TestEndToEndSplitTransaction {
Throwable ex; Throwable ex;
RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException { RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
super("RegionChecker", stopper, 10); super("RegionChecker", stopper, 100);
this.conf = conf; this.conf = conf;
this.tableName = tableName; this.tableName = tableName;
@ -669,7 +669,7 @@ public class TestEndToEndSplitTransaction {
log("found region in META: " + hri.getRegionNameAsString()); log("found region in META: " + hri.getRegionNameAsString());
break; break;
} }
Threads.sleep(10); Threads.sleep(100);
} }
} }
@ -690,7 +690,7 @@ public class TestEndToEndSplitTransaction {
} catch (IOException ex) { } catch (IOException ex) {
// wait some more // wait some more
} }
Threads.sleep(10); Threads.sleep(100);
} }
} }
} }

View File

@ -1380,15 +1380,20 @@ public class TestSplitTransactionOnCluster {
regionServer.kill(); regionServer.kill();
cluster.getRegionServerThreads().get(serverWith).join(); cluster.getRegionServerThreads().get(serverWith).join();
// Wait until finish processing of shutdown // Wait until finish processing of shutdown
while (cluster.getMaster().getServerManager().areDeadServersInProgress()) { TESTING_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
Thread.sleep(10); @Override
} public boolean evaluate() throws Exception {
AssignmentManager am = cluster.getMaster().getAssignmentManager(); return !cluster.getMaster().getServerManager().areDeadServersInProgress();
while(am.getRegionStates().isRegionsInTransition()) { }
Thread.sleep(10); });
} // Wait until there are no more regions in transition
assertEquals(am.getRegionStates().getRegionsInTransition().toString(), 0, am TESTING_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
.getRegionStates().getRegionsInTransition().size()); @Override
public boolean evaluate() throws Exception {
return !cluster.getMaster().getAssignmentManager().
getRegionStates().isRegionsInTransition();
}
});
regionDirs = regionDirs =
FSUtils.getRegionDirs(tableDir.getFileSystem(cluster.getConfiguration()), tableDir); FSUtils.getRegionDirs(tableDir.getFileSystem(cluster.getConfiguration()), tableDir);
assertEquals(1,regionDirs.size()); assertEquals(1,regionDirs.size());