HBASE-23632 DeadServer cleanup (#979)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
1b366719a0
commit
8cf5d045e0
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -20,7 +19,6 @@ package org.apache.hadoop.hbase.master;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -40,7 +38,13 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|||
|
||||
/**
|
||||
* Class to hold dead servers list and utility querying dead server list.
|
||||
* On znode expiration, servers are added here.
|
||||
* Servers are added when they expire or when we find them in filesystem on startup.
|
||||
* When a server crash procedure is queued, it will populate the processing list and
|
||||
* then remove the server from processing list when done. Servers are removed from
|
||||
* dead server list when a new instance is started over the old on same hostname and
|
||||
* port or when new Master comes online tidying up after all initialization. Processing
|
||||
* list and deadserver list are not tied together (you don't have to be in deadservers
|
||||
* list to be processing and vice versa).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DeadServer {
|
||||
|
@ -56,37 +60,11 @@ public class DeadServer {
|
|||
private final Map<ServerName, Long> deadServers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Set of dead servers currently being processed
|
||||
* Set of dead servers currently being processed by a SCP.
|
||||
* Added to this list at the start of SCP and removed after it is done
|
||||
* processing the crash.
|
||||
*/
|
||||
private final Set<ServerName> processingServers = new HashSet<ServerName>();
|
||||
|
||||
/**
|
||||
* Handles restart of a server. The new server instance has a different start code.
|
||||
* The new start code should be greater than the old one. We don't check that here.
|
||||
*
|
||||
* @param newServerName Servername as either <code>host:port</code> or
|
||||
* <code>host,port,startcode</code>.
|
||||
* @return true if this server was dead before and coming back alive again
|
||||
*/
|
||||
public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
|
||||
Iterator<ServerName> it = deadServers.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
ServerName sn = it.next();
|
||||
if (ServerName.isSameAddress(sn, newServerName)) {
|
||||
// remove from deadServers
|
||||
it.remove();
|
||||
// remove from processingServers
|
||||
boolean removed = processingServers.remove(sn);
|
||||
if (removed) {
|
||||
LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
|
||||
processingServers.size());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
private final Set<ServerName> processingServers = new HashSet<>();
|
||||
|
||||
/**
|
||||
* @param serverName server name.
|
||||
|
@ -96,14 +74,6 @@ public class DeadServer {
|
|||
return deadServers.containsKey(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverName server name.
|
||||
* @return true if this server is on the processing servers list false otherwise
|
||||
*/
|
||||
public synchronized boolean isProcessingServer(final ServerName serverName) {
|
||||
return processingServers.contains(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if there are currently any dead servers being processed by the
|
||||
* master. Returns true if at least one region server is currently being
|
||||
|
@ -111,7 +81,7 @@ public class DeadServer {
|
|||
*
|
||||
* @return true if any RS are being processed as dead
|
||||
*/
|
||||
public synchronized boolean areDeadServersInProgress() {
|
||||
synchronized boolean areDeadServersInProgress() {
|
||||
return !processingServers.isEmpty();
|
||||
}
|
||||
|
||||
|
@ -124,41 +94,30 @@ public class DeadServer {
|
|||
/**
|
||||
* Adds the server to the dead server list if it's not there already.
|
||||
*/
|
||||
public synchronized void add(ServerName sn) {
|
||||
if (!deadServers.containsKey(sn)){
|
||||
deadServers.put(sn, EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
boolean added = processingServers.add(sn);
|
||||
if (LOG.isDebugEnabled() && added) {
|
||||
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
|
||||
}
|
||||
synchronized void putIfAbsent(ServerName sn) {
|
||||
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
|
||||
processing(sn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that we started processing this dead server.
|
||||
* @param sn ServerName for the dead server.
|
||||
* Add <code>sn<</code> to set of processing deadservers.
|
||||
* @see #finish(ServerName)
|
||||
*/
|
||||
public synchronized void notifyServer(ServerName sn) {
|
||||
boolean added = processingServers.add(sn);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (added) {
|
||||
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
|
||||
}
|
||||
LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
|
||||
public synchronized void processing(ServerName sn) {
|
||||
if (processingServers.add(sn)) {
|
||||
// Only log on add.
|
||||
LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete processing for this dead server.
|
||||
* @param sn ServerName for the dead server.
|
||||
* @see #processing(ServerName)
|
||||
*/
|
||||
public synchronized void finish(ServerName sn) {
|
||||
boolean removed = processingServers.remove(sn);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
|
||||
if (removed) {
|
||||
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
|
||||
}
|
||||
if (processingServers.remove(sn)) {
|
||||
LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -166,30 +125,59 @@ public class DeadServer {
|
|||
return deadServers.size();
|
||||
}
|
||||
|
||||
public synchronized boolean isEmpty() {
|
||||
synchronized boolean isEmpty() {
|
||||
return deadServers.isEmpty();
|
||||
}
|
||||
|
||||
public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
|
||||
/**
|
||||
* Handles restart of a server. The new server instance has a different start code.
|
||||
* The new start code should be greater than the old one. We don't check that here.
|
||||
* Removes the old server from deadserver list.
|
||||
*
|
||||
* @param newServerName Servername as either <code>host:port</code> or
|
||||
* <code>host,port,startcode</code>.
|
||||
* @return true if this server was dead before and coming back alive again
|
||||
*/
|
||||
synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
|
||||
Iterator<ServerName> it = deadServers.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
ServerName sn = it.next();
|
||||
if (ServerName.isSameAddress(sn, newServerName)) {
|
||||
// remove from deadServers
|
||||
it.remove();
|
||||
// remove from processingServers
|
||||
boolean removed = processingServers.remove(sn);
|
||||
if (removed) {
|
||||
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
|
||||
}
|
||||
if (cleanOldServerName(newServerName, it)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
|
||||
Iterator<ServerName> it = deadServers.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
cleanOldServerName(newServerName, it);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param newServerName Server to match port and hostname against.
|
||||
* @param deadServerIterator Iterator primed so can call 'next' on it.
|
||||
* @return True if <code>newServerName</code> and current primed
|
||||
* iterator ServerName have same host and port and we removed old server
|
||||
* from iterator and from processing list.
|
||||
*/
|
||||
private boolean cleanOldServerName(ServerName newServerName,
|
||||
Iterator<ServerName> deadServerIterator) {
|
||||
ServerName sn = deadServerIterator.next();
|
||||
if (ServerName.isSameAddress(sn, newServerName)) {
|
||||
// Remove from dead servers list. Don't remove from the processing list --
|
||||
// let the SCP do it when it is done.
|
||||
deadServerIterator.remove();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
// Display unified set of servers from both maps
|
||||
Set<ServerName> servers = new HashSet<ServerName>();
|
||||
Set<ServerName> servers = new HashSet<>();
|
||||
servers.addAll(deadServers.keySet());
|
||||
servers.addAll(processingServers);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -211,7 +199,7 @@ public class DeadServer {
|
|||
* @param ts the time, 0 for all
|
||||
* @return a sorted array list, by death time, lowest values first.
|
||||
*/
|
||||
public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
|
||||
synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
|
||||
List<Pair<ServerName, Long>> res = new ArrayList<>(size());
|
||||
|
||||
for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
|
||||
|
@ -220,7 +208,7 @@ public class DeadServer {
|
|||
}
|
||||
}
|
||||
|
||||
Collections.sort(res, ServerNameDeathDateComparator);
|
||||
Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -234,28 +222,15 @@ public class DeadServer {
|
|||
return time == null ? null : new Date(time);
|
||||
}
|
||||
|
||||
private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
|
||||
new Comparator<Pair<ServerName, Long>>(){
|
||||
|
||||
@Override
|
||||
public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
|
||||
return o1.getSecond().compareTo(o2.getSecond());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* remove the specified dead server
|
||||
* Called from rpc by operator cleaning up deadserver list.
|
||||
* @param deadServerName the dead server name
|
||||
* @return true if this server was removed
|
||||
*/
|
||||
|
||||
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
|
||||
Preconditions.checkState(!processingServers.contains(deadServerName),
|
||||
"Asked to remove server still in processingServers set " + deadServerName +
|
||||
" (numProcessing=" + processingServers.size() + ")");
|
||||
if (deadServers.remove(deadServerName) == null) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return this.deadServers.remove(deadServerName) != null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -319,7 +319,7 @@ public class ServerManager {
|
|||
*/
|
||||
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
|
||||
Set<ServerName> liveServersFromWALDir) {
|
||||
deadServersFromPE.forEach(deadservers::add);
|
||||
deadServersFromPE.forEach(deadservers::putIfAbsent);
|
||||
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
|
||||
.forEach(this::expireServer);
|
||||
}
|
||||
|
@ -350,6 +350,8 @@ public class ServerManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Called when RegionServer first reports in for duty and thereafter each
|
||||
* time it heartbeats to make sure it is has not been figured for dead.
|
||||
* If this server is on the dead list, reject it with a YouAreDeadException.
|
||||
* If it was dead but came back with a new start code, remove the old entry
|
||||
* from the dead list.
|
||||
|
@ -358,21 +360,20 @@ public class ServerManager {
|
|||
private void checkIsDead(final ServerName serverName, final String what)
|
||||
throws YouAreDeadException {
|
||||
if (this.deadservers.isDeadServer(serverName)) {
|
||||
// host name, port and start code all match with existing one of the
|
||||
// dead servers. So, this server must be dead.
|
||||
// Exact match: host name, port and start code all match with existing one of the
|
||||
// dead servers. So, this server must be dead. Tell it to kill itself.
|
||||
String message = "Server " + what + " rejected; currently processing " +
|
||||
serverName + " as dead server";
|
||||
LOG.debug(message);
|
||||
throw new YouAreDeadException(message);
|
||||
}
|
||||
// remove dead server with same hostname and port of newly checking in rs after master
|
||||
// initialization.See HBASE-5916 for more information.
|
||||
if ((this.master == null || this.master.isInitialized())
|
||||
&& this.deadservers.cleanPreviousInstance(serverName)) {
|
||||
// Remove dead server with same hostname and port of newly checking in rs after master
|
||||
// initialization. See HBASE-5916 for more information.
|
||||
if ((this.master == null || this.master.isInitialized()) &&
|
||||
this.deadservers.cleanPreviousInstance(serverName)) {
|
||||
// This server has now become alive after we marked it as dead.
|
||||
// We removed it's previous entry from the dead list to reflect it.
|
||||
LOG.debug(what + ":" + " Server " + serverName + " came back up," +
|
||||
" removed it from the dead servers list");
|
||||
LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -575,7 +576,10 @@ public class ServerManager {
|
|||
return pid;
|
||||
}
|
||||
|
||||
// Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
|
||||
/**
|
||||
* Called when server has expired.
|
||||
*/
|
||||
// Locking in this class needs cleanup.
|
||||
@VisibleForTesting
|
||||
public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
|
||||
synchronized (this.onlineServers) {
|
||||
|
@ -584,7 +588,7 @@ public class ServerManager {
|
|||
// Remove the server from the known servers lists and update load info BUT
|
||||
// add to deadservers first; do this so it'll show in dead servers list if
|
||||
// not in online servers list.
|
||||
this.deadservers.add(sn);
|
||||
this.deadservers.putIfAbsent(sn);
|
||||
this.onlineServers.remove(sn);
|
||||
onlineServers.notifyAll();
|
||||
} else {
|
||||
|
@ -885,7 +889,7 @@ public class ServerManager {
|
|||
/**
|
||||
* Check if a server is known to be dead. A server can be online,
|
||||
* or known to be dead, or unknown to this manager (i.e, not online,
|
||||
* not known to be dead either. it is simply not tracked by the
|
||||
* not known to be dead either; it is simply not tracked by the
|
||||
* master any more, for example, a very old previous instance).
|
||||
*/
|
||||
public synchronized boolean isServerDead(ServerName serverName) {
|
||||
|
|
|
@ -124,7 +124,7 @@ public class ServerCrashProcedure
|
|||
// This adds server to the DeadServer processing list but not to the DeadServers list.
|
||||
// Server gets removed from processing list below on procedure successful finish.
|
||||
if (!notifiedDeadServer) {
|
||||
services.getServerManager().getDeadServers().notifyServer(serverName);
|
||||
services.getServerManager().getDeadServers().processing(serverName);
|
||||
notifiedDeadServer = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -67,20 +67,20 @@ public class TestDeadServer {
|
|||
|
||||
@Test public void testIsDead() {
|
||||
DeadServer ds = new DeadServer();
|
||||
ds.add(hostname123);
|
||||
ds.notifyServer(hostname123);
|
||||
ds.putIfAbsent(hostname123);
|
||||
ds.processing(hostname123);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname123);
|
||||
assertFalse(ds.areDeadServersInProgress());
|
||||
|
||||
ds.add(hostname1234);
|
||||
ds.notifyServer(hostname1234);
|
||||
ds.putIfAbsent(hostname1234);
|
||||
ds.processing(hostname1234);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname1234);
|
||||
assertFalse(ds.areDeadServersInProgress());
|
||||
|
||||
ds.add(hostname12345);
|
||||
ds.notifyServer(hostname12345);
|
||||
ds.putIfAbsent(hostname12345);
|
||||
ds.processing(hostname12345);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname12345);
|
||||
assertFalse(ds.areDeadServersInProgress());
|
||||
|
@ -90,7 +90,7 @@ public class TestDeadServer {
|
|||
|
||||
final ServerName deadServer = ServerName.valueOf("127.0.0.1", 9090, 112321L);
|
||||
assertFalse(ds.cleanPreviousInstance(deadServer));
|
||||
ds.add(deadServer);
|
||||
ds.putIfAbsent(deadServer);
|
||||
assertTrue(ds.isDeadServer(deadServer));
|
||||
Set<ServerName> deadServerNames = ds.copyServerNames();
|
||||
for (ServerName eachDeadServer : deadServerNames) {
|
||||
|
@ -123,11 +123,11 @@ public class TestDeadServer {
|
|||
|
||||
DeadServer d = new DeadServer();
|
||||
|
||||
d.add(hostname123);
|
||||
d.putIfAbsent(hostname123);
|
||||
mee.incValue(1);
|
||||
d.add(hostname1234);
|
||||
d.putIfAbsent(hostname1234);
|
||||
mee.incValue(1);
|
||||
d.add(hostname12345);
|
||||
d.putIfAbsent(hostname12345);
|
||||
|
||||
List<Pair<ServerName, Long>> copy = d.copyDeadServersSince(2L);
|
||||
Assert.assertEquals(2, copy.size());
|
||||
|
@ -144,7 +144,7 @@ public class TestDeadServer {
|
|||
@Test
|
||||
public void testClean(){
|
||||
DeadServer d = new DeadServer();
|
||||
d.add(hostname123);
|
||||
d.putIfAbsent(hostname123);
|
||||
|
||||
d.cleanPreviousInstance(hostname12345);
|
||||
Assert.assertFalse(d.isEmpty());
|
||||
|
@ -159,8 +159,8 @@ public class TestDeadServer {
|
|||
@Test
|
||||
public void testClearDeadServer(){
|
||||
DeadServer d = new DeadServer();
|
||||
d.add(hostname123);
|
||||
d.add(hostname1234);
|
||||
d.putIfAbsent(hostname123);
|
||||
d.putIfAbsent(hostname1234);
|
||||
Assert.assertEquals(2, d.size());
|
||||
|
||||
d.finish(hostname123);
|
||||
|
@ -170,7 +170,7 @@ public class TestDeadServer {
|
|||
d.removeDeadServer(hostname1234);
|
||||
Assert.assertTrue(d.isEmpty());
|
||||
|
||||
d.add(hostname1234);
|
||||
d.putIfAbsent(hostname1234);
|
||||
Assert.assertFalse(d.removeDeadServer(hostname123_2));
|
||||
Assert.assertEquals(1, d.size());
|
||||
d.finish(hostname1234);
|
||||
|
|
|
@ -104,7 +104,7 @@ public class TestHBCKSCP extends TestSCPBase {
|
|||
assertEquals(RegionState.State.OPEN.toString(),
|
||||
Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
|
||||
ServerName serverName = MetaTableAccessor.getServerName(r, 0);
|
||||
assertTrue(rsServerName.equals(serverName));
|
||||
assertEquals(rsServerName, serverName);
|
||||
// moveFrom adds to dead servers and adds it to processing list only we will
|
||||
// not be processing this server 'normally'. Remove it from processing by
|
||||
// calling 'finish' and then remove it from dead servers so rsServerName
|
||||
|
@ -154,14 +154,13 @@ public class TestHBCKSCP extends TestSCPBase {
|
|||
assertNotEquals(rsServerName, serverName);
|
||||
// Make sure no mention of old server post SCP.
|
||||
assertFalse(searchMeta(master, rsServerName));
|
||||
assertFalse(master.getServerManager().getDeadServers().isProcessingServer(rsServerName));
|
||||
assertFalse(master.getServerManager().getDeadServers().isDeadServer(rsServerName));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if we find reference to <code>sn</code> in meta table.
|
||||
*/
|
||||
boolean searchMeta(HMaster master, ServerName sn) throws IOException {
|
||||
private boolean searchMeta(HMaster master, ServerName sn) throws IOException {
|
||||
List<Pair<RegionInfo, ServerName>> ps =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(master.getConnection(), null);
|
||||
for (Pair<RegionInfo, ServerName> p: ps) {
|
||||
|
|
Loading…
Reference in New Issue