HBASE-25282 Remove processingServers in DeadServer as we can get this… (#2657)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
984d578dd4
commit
403756ade5
|
@ -33,9 +33,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to hold dead servers list and utility querying dead server list.
|
* Class to hold dead servers list and utility querying dead server list.
|
||||||
* Servers are added when they expire or when we find them in filesystem on startup.
|
* Servers are added when they expire or when we find them in filesystem on startup.
|
||||||
|
@ -59,13 +56,6 @@ public class DeadServer {
|
||||||
*/
|
*/
|
||||||
private final Map<ServerName, Long> deadServers = new HashMap<>();
|
private final Map<ServerName, Long> deadServers = new HashMap<>();
|
||||||
|
|
||||||
/**
|
|
||||||
* Set of dead servers currently being processed by a SCP.
|
|
||||||
* Added to this list at the start of SCP and removed after it is done
|
|
||||||
* processing the crash.
|
|
||||||
*/
|
|
||||||
private final Set<ServerName> processingServers = new HashSet<>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param serverName server name.
|
* @param serverName server name.
|
||||||
* @return true if this server is on the dead servers list false otherwise
|
* @return true if this server is on the dead servers list false otherwise
|
||||||
|
@ -74,17 +64,6 @@ public class DeadServer {
|
||||||
return deadServers.containsKey(serverName);
|
return deadServers.containsKey(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if there are currently any dead servers being processed by the
|
|
||||||
* master. Returns true if at least one region server is currently being
|
|
||||||
* processed as dead.
|
|
||||||
*
|
|
||||||
* @return true if any RS are being processed as dead
|
|
||||||
*/
|
|
||||||
synchronized boolean areDeadServersInProgress() {
|
|
||||||
return !processingServers.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized Set<ServerName> copyServerNames() {
|
public synchronized Set<ServerName> copyServerNames() {
|
||||||
Set<ServerName> clone = new HashSet<>(deadServers.size());
|
Set<ServerName> clone = new HashSet<>(deadServers.size());
|
||||||
clone.addAll(deadServers.keySet());
|
clone.addAll(deadServers.keySet());
|
||||||
|
@ -96,29 +75,6 @@ public class DeadServer {
|
||||||
*/
|
*/
|
||||||
synchronized void putIfAbsent(ServerName sn) {
|
synchronized void putIfAbsent(ServerName sn) {
|
||||||
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
|
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
|
||||||
processing(sn);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add <code>sn<</code> to set of processing deadservers.
|
|
||||||
* @see #finish(ServerName)
|
|
||||||
*/
|
|
||||||
public synchronized void processing(ServerName sn) {
|
|
||||||
if (processingServers.add(sn)) {
|
|
||||||
// Only log on add.
|
|
||||||
LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Complete processing for this dead server.
|
|
||||||
* @param sn ServerName for the dead server.
|
|
||||||
* @see #processing(ServerName)
|
|
||||||
*/
|
|
||||||
public synchronized void finish(ServerName sn) {
|
|
||||||
if (processingServers.remove(sn)) {
|
|
||||||
LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int size() {
|
public synchronized int size() {
|
||||||
|
@ -179,17 +135,12 @@ public class DeadServer {
|
||||||
// Display unified set of servers from both maps
|
// Display unified set of servers from both maps
|
||||||
Set<ServerName> servers = new HashSet<>();
|
Set<ServerName> servers = new HashSet<>();
|
||||||
servers.addAll(deadServers.keySet());
|
servers.addAll(deadServers.keySet());
|
||||||
servers.addAll(processingServers);
|
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (ServerName sn : servers) {
|
for (ServerName sn : servers) {
|
||||||
if (sb.length() > 0) {
|
if (sb.length() > 0) {
|
||||||
sb.append(", ");
|
sb.append(", ");
|
||||||
}
|
}
|
||||||
sb.append(sn.toString());
|
sb.append(sn.toString());
|
||||||
// Star entries that are being processed
|
|
||||||
if (processingServers.contains(sn)) {
|
|
||||||
sb.append("*");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -228,9 +179,6 @@ public class DeadServer {
|
||||||
* @return true if this server was removed
|
* @return true if this server was removed
|
||||||
*/
|
*/
|
||||||
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
|
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
|
||||||
Preconditions.checkState(!processingServers.contains(deadServerName),
|
|
||||||
"Asked to remove server still in processingServers set " + deadServerName +
|
|
||||||
" (numProcessing=" + processingServers.size() + ")");
|
|
||||||
return this.deadServers.remove(deadServerName) != null;
|
return this.deadServers.remove(deadServerName) != null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2440,6 +2440,15 @@ public class MasterRpcServices extends RSRpcServices implements
|
||||||
Set<Address> clearedServers = new HashSet<>();
|
Set<Address> clearedServers = new HashSet<>();
|
||||||
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
|
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
|
||||||
ServerName server = ProtobufUtil.toServerName(pbServer);
|
ServerName server = ProtobufUtil.toServerName(pbServer);
|
||||||
|
|
||||||
|
final boolean deadInProcess = master.getProcedures().stream().anyMatch(
|
||||||
|
p -> (p instanceof ServerCrashProcedure)
|
||||||
|
&& ((ServerCrashProcedure) p).getServerName().equals(server));
|
||||||
|
if (deadInProcess) {
|
||||||
|
throw new ServiceException(
|
||||||
|
String.format("Dead server '%s' is not 'dead' in fact...", server));
|
||||||
|
}
|
||||||
|
|
||||||
if (!deadServer.removeDeadServer(server)) {
|
if (!deadServer.removeDeadServer(server)) {
|
||||||
response.addServerName(pbServer);
|
response.addServerName(pbServer);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
|
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -503,8 +504,8 @@ public class ServerManager {
|
||||||
* Checks if any dead servers are currently in progress.
|
* Checks if any dead servers are currently in progress.
|
||||||
* @return true if any RS are being processed as dead, false if not
|
* @return true if any RS are being processed as dead, false if not
|
||||||
*/
|
*/
|
||||||
public boolean areDeadServersInProgress() {
|
public boolean areDeadServersInProgress() throws IOException {
|
||||||
return this.deadservers.areDeadServersInProgress();
|
return master.getProcedures().stream().anyMatch(p -> p instanceof ServerCrashProcedure);
|
||||||
}
|
}
|
||||||
|
|
||||||
void letRegionServersShutdown() {
|
void letRegionServersShutdown() {
|
||||||
|
|
|
@ -122,7 +122,6 @@ public class ServerCrashProcedure
|
||||||
// This adds server to the DeadServer processing list but not to the DeadServers list.
|
// This adds server to the DeadServer processing list but not to the DeadServers list.
|
||||||
// Server gets removed from processing list below on procedure successful finish.
|
// Server gets removed from processing list below on procedure successful finish.
|
||||||
if (!notifiedDeadServer) {
|
if (!notifiedDeadServer) {
|
||||||
services.getServerManager().getDeadServers().processing(serverName);
|
|
||||||
notifiedDeadServer = true;
|
notifiedDeadServer = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +229,6 @@ public class ServerCrashProcedure
|
||||||
case SERVER_CRASH_FINISH:
|
case SERVER_CRASH_FINISH:
|
||||||
LOG.info("removed crashed server {} after splitting done", serverName);
|
LOG.info("removed crashed server {} after splitting done", serverName);
|
||||||
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
||||||
services.getServerManager().getDeadServers().finish(serverName);
|
|
||||||
updateProgress(true);
|
updateProgress(true);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -172,7 +172,7 @@ public class TestRegionRebalancing {
|
||||||
/**
|
/**
|
||||||
* Wait on crash processing. Balancer won't run if processing a crashed server.
|
* Wait on crash processing. Balancer won't run if processing a crashed server.
|
||||||
*/
|
*/
|
||||||
private void waitOnCrashProcessing() {
|
private void waitOnCrashProcessing() throws IOException {
|
||||||
while (UTIL.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) {
|
while (UTIL.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) {
|
||||||
LOG.info("Waiting on processing of crashed server before proceeding...");
|
LOG.info("Waiting on processing of crashed server before proceeding...");
|
||||||
Threads.sleep(1000);
|
Threads.sleep(1000);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -68,22 +69,10 @@ public class TestDeadServer {
|
||||||
@Test public void testIsDead() {
|
@Test public void testIsDead() {
|
||||||
DeadServer ds = new DeadServer();
|
DeadServer ds = new DeadServer();
|
||||||
ds.putIfAbsent(hostname123);
|
ds.putIfAbsent(hostname123);
|
||||||
ds.processing(hostname123);
|
|
||||||
assertTrue(ds.areDeadServersInProgress());
|
|
||||||
ds.finish(hostname123);
|
|
||||||
assertFalse(ds.areDeadServersInProgress());
|
|
||||||
|
|
||||||
ds.putIfAbsent(hostname1234);
|
ds.putIfAbsent(hostname1234);
|
||||||
ds.processing(hostname1234);
|
|
||||||
assertTrue(ds.areDeadServersInProgress());
|
|
||||||
ds.finish(hostname1234);
|
|
||||||
assertFalse(ds.areDeadServersInProgress());
|
|
||||||
|
|
||||||
ds.putIfAbsent(hostname12345);
|
ds.putIfAbsent(hostname12345);
|
||||||
ds.processing(hostname12345);
|
|
||||||
assertTrue(ds.areDeadServersInProgress());
|
|
||||||
ds.finish(hostname12345);
|
|
||||||
assertFalse(ds.areDeadServersInProgress());
|
|
||||||
|
|
||||||
// Already dead = 127.0.0.1,9090,112321
|
// Already dead = 127.0.0.1,9090,112321
|
||||||
// Coming back alive = 127.0.0.1,9090,223341
|
// Coming back alive = 127.0.0.1,9090,223341
|
||||||
|
@ -104,7 +93,7 @@ public class TestDeadServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCrashProcedureReplay() {
|
public void testCrashProcedureReplay() throws IOException {
|
||||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
|
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
|
||||||
ServerCrashProcedure proc = new ServerCrashProcedure(
|
ServerCrashProcedure proc = new ServerCrashProcedure(
|
||||||
|
@ -112,7 +101,7 @@ public class TestDeadServer {
|
||||||
|
|
||||||
ProcedureTestingUtility.submitAndWait(pExecutor, proc);
|
ProcedureTestingUtility.submitAndWait(pExecutor, proc);
|
||||||
|
|
||||||
assertFalse(master.getServerManager().getDeadServers().areDeadServersInProgress());
|
assertTrue(master.getServerManager().areDeadServersInProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -163,17 +152,14 @@ public class TestDeadServer {
|
||||||
d.putIfAbsent(hostname1234);
|
d.putIfAbsent(hostname1234);
|
||||||
Assert.assertEquals(2, d.size());
|
Assert.assertEquals(2, d.size());
|
||||||
|
|
||||||
d.finish(hostname123);
|
|
||||||
d.removeDeadServer(hostname123);
|
d.removeDeadServer(hostname123);
|
||||||
Assert.assertEquals(1, d.size());
|
Assert.assertEquals(1, d.size());
|
||||||
d.finish(hostname1234);
|
|
||||||
d.removeDeadServer(hostname1234);
|
d.removeDeadServer(hostname1234);
|
||||||
Assert.assertTrue(d.isEmpty());
|
Assert.assertTrue(d.isEmpty());
|
||||||
|
|
||||||
d.putIfAbsent(hostname1234);
|
d.putIfAbsent(hostname1234);
|
||||||
Assert.assertFalse(d.removeDeadServer(hostname123_2));
|
Assert.assertFalse(d.removeDeadServer(hostname123_2));
|
||||||
Assert.assertEquals(1, d.size());
|
Assert.assertEquals(1, d.size());
|
||||||
d.finish(hostname1234);
|
|
||||||
Assert.assertTrue(d.removeDeadServer(hostname1234));
|
Assert.assertTrue(d.removeDeadServer(hostname1234));
|
||||||
Assert.assertTrue(d.isEmpty());
|
Assert.assertTrue(d.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
|
@ -221,7 +221,7 @@ public class TestRollingRestart {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
|
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
|
||||||
ServerName serverName) throws InterruptedException {
|
ServerName serverName) throws InterruptedException, IOException {
|
||||||
ServerManager sm = activeMaster.getMaster().getServerManager();
|
ServerManager sm = activeMaster.getMaster().getServerManager();
|
||||||
// First wait for it to be in dead list
|
// First wait for it to be in dead list
|
||||||
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
||||||
|
|
|
@ -110,7 +110,6 @@ public class TestHBCKSCP extends TestSCPBase {
|
||||||
// calling 'finish' and then remove it from dead servers so rsServerName
|
// calling 'finish' and then remove it from dead servers so rsServerName
|
||||||
// becomes an 'Unknown Server' even though it is still around.
|
// becomes an 'Unknown Server' even though it is still around.
|
||||||
master.getServerManager().moveFromOnlineToDeadServers(rsServerName);
|
master.getServerManager().moveFromOnlineToDeadServers(rsServerName);
|
||||||
master.getServerManager().getDeadServers().finish(rsServerName);
|
|
||||||
master.getServerManager().getDeadServers().removeDeadServer(rsServerName);
|
master.getServerManager().getDeadServers().removeDeadServer(rsServerName);
|
||||||
master.getAssignmentManager().getRegionStates().removeServer(rsServerName);
|
master.getAssignmentManager().getRegionStates().removeServer(rsServerName);
|
||||||
// Kill the server. Nothing should happen since an 'Unknown Server' as far
|
// Kill the server. Nothing should happen since an 'Unknown Server' as far
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -101,7 +102,7 @@ public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
|
private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
|
||||||
ServerName serverName) throws InterruptedException {
|
ServerName serverName) throws InterruptedException, IOException {
|
||||||
ServerManager sm = activeMaster.getMaster().getServerManager();
|
ServerManager sm = activeMaster.getMaster().getServerManager();
|
||||||
// First wait for it to be in dead list
|
// First wait for it to be in dead list
|
||||||
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
||||||
|
|
Loading…
Reference in New Issue