HBASE-26538 Should find a way to clear the replication queue for a legacy region_replica_replication peer (#3918)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-12-06 23:42:25 +08:00
parent a62e071bf7
commit 1fdd0a4cfd
5 changed files with 49 additions and 7 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -29,8 +30,10 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,13 +83,26 @@ public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEn
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
try {
List<String> queues = storage.getAllQueues(crashedServer);
// this is for upgrading to the new region replication framework, where we will delete the
// legacy region_replica_replication peer directly, without deleting the replication queues,
// as it may still be used by region servers which have not been upgraded yet.
for (Iterator<String> iter = queues.iterator(); iter.hasNext();) {
ReplicationQueueInfo queue = new ReplicationQueueInfo(iter.next());
if (queue.getPeerId().equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)) {
LOG.info("Found replication queue {} for legacy region replication peer, " +
"skipping claiming and removing...", queue.getQueueId());
iter.remove();
storage.removeQueue(crashedServer, queue.getQueueId());
}
}
if (queues.isEmpty()) {
LOG.debug("Finish claiming replication queues for {}", crashedServer);
storage.removeReplicatorIfQueueIsEmpty(crashedServer);
// we are done
return null;
}
LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
LOG.debug(
"There are {} replication queues need to be claimed for {}", queues.size(),
crashedServer);
List<ServerName> targetServers =
env.getMasterServices().getServerManager().getOnlineServersList();

View File

@ -553,7 +553,7 @@ public class ReplicationPeerManager {
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
.equals(peerConfig.getReplicationEndpointImpl())) {
// we do not use this endpoint for region replication any more, see HBASE-26233
LOG.warn("Legacy region replication peer found, removing: {}", peerConfig);
LOG.info("Legacy region replication peer found, removing: {}", peerConfig);
peerStorage.removePeer(peerId);
continue;
}

View File

@ -347,7 +347,7 @@ public class ReplicationSourceManager {
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
.equals(peer.getPeerConfig().getReplicationEndpointImpl())) {
// we do not use this endpoint for region replication any more, see HBASE-26233
LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
return;
}
ReplicationSourceInterface src = createSource(peerId, peer);

View File

@ -50,6 +50,12 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
/**
* @deprecated Since 3.0.0, leave here only for implementing compatibility code.
*/
@Deprecated
public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
*/

View File

@ -25,11 +25,17 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -63,16 +69,22 @@ public class TestStartupWithLegacyRegionReplicationEndpoint {
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
SingleProcessHBaseCluster cluster = UTIL.getMiniHBaseCluster();
HMaster master = cluster.getMaster();
// can not use Admin.addPeer as it will fail with ClassNotFound
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig,
true);
UTIL.getMiniHBaseCluster().stopRegionServer(0);
RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer();
master.getReplicationPeerManager().addPeer("legacy", peerConfig, true);
// add a wal file to the queue
ServerName rsName = cluster.getRegionServer(0).getServerName();
master.getReplicationPeerManager().getQueueStorage().addWAL(rsName,
ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, "test-wal-file");
cluster.stopRegionServer(0);
RegionServerThread rst = cluster.startRegionServer();
// we should still have this peer
assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy"));
// but at RS side, we should not have this peer loaded as replication source
assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager()
.getSources().isEmpty());
UTIL.shutdownMiniHBaseCluster();
UTIL.restartHBaseCluster(1);
// now we should have removed the peer
@ -81,5 +93,13 @@ public class TestStartupWithLegacyRegionReplicationEndpoint {
// at rs side, we should not have the peer this time, not only for not having replication source
assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService()
.getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty());
// make sure that we can finish the SCP and delete the test-wal-file
UTIL.waitFor(15000,
() -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p)
.allMatch(Procedure::isSuccess));
assertTrue(UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage()
.getAllQueues(rsName).isEmpty());
}
}