diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java index 5a35c3fbc10..28e71bfd6f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -29,8 +30,10 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,13 +83,26 @@ public class ClaimReplicationQueuesProcedure extends Procedure queues = storage.getAllQueues(crashedServer); + // this is for upgrading to the new region replication framework, where we will delete the + // legacy region_replica_replication peer directly, without deleting the replication queues, + // as it may still be used by region servers which have not been upgraded yet. + for (Iterator iter = queues.iterator(); iter.hasNext();) { + ReplicationQueueInfo queue = new ReplicationQueueInfo(iter.next()); + if (queue.getPeerId().equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)) { + LOG.info("Found replication queue {} for legacy region replication peer, " + + "skipping claiming and removing...", queue.getQueueId()); + iter.remove(); + storage.removeQueue(crashedServer, queue.getQueueId()); + } + } if (queues.isEmpty()) { LOG.debug("Finish claiming replication queues for {}", crashedServer); storage.removeReplicatorIfQueueIsEmpty(crashedServer); // we are done return null; } - LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(), + LOG.debug( + "There are {} replication queues need to be claimed for {}", queues.size(), crashedServer); List targetServers = env.getMasterServices().getServerManager().getOnlineServersList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index b62d4b42836..176eb21653a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -553,7 +553,7 @@ public class ReplicationPeerManager { if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME .equals(peerConfig.getReplicationEndpointImpl())) { // we do not use this endpoint for region replication any more, see HBASE-26233 - LOG.warn("Legacy region replication peer found, removing: {}", peerConfig); + LOG.info("Legacy region replication peer found, removing: {}", peerConfig); peerStorage.removePeer(peerId); continue; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 7a16e3591b4..ddb25f4ef68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -347,7 +347,7 @@ public class ReplicationSourceManager { if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME .equals(peer.getPeerConfig().getReplicationEndpointImpl())) { // we do not use this endpoint for region replication any more, see HBASE-26233 - LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig()); + LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig()); return; } ReplicationSourceInterface src = createSource(peerId, peer); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 6a46bd46ce6..1844be641a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -50,6 +50,12 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { = "hbase.region.replica.replication.enabled"; private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false; + /** + * @deprecated Since 3.0.0, leave here only for implementing compatibility code. + */ + @Deprecated + public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + /** * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java index 2c75d373c74..a550ecd1291 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java @@ -25,11 +25,17 @@ import java.io.IOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -63,16 +69,22 @@ public class TestStartupWithLegacyRegionReplicationEndpoint { ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() .setClusterKey("127.0.0.1:2181:/hbase") .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build(); + SingleProcessHBaseCluster cluster = UTIL.getMiniHBaseCluster(); + HMaster master = cluster.getMaster(); // can not use Admin.addPeer as it will fail with ClassNotFound - UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig, - true); - UTIL.getMiniHBaseCluster().stopRegionServer(0); - RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer(); + master.getReplicationPeerManager().addPeer("legacy", peerConfig, true); + // add a wal file to the queue + ServerName rsName = cluster.getRegionServer(0).getServerName(); + master.getReplicationPeerManager().getQueueStorage().addWAL(rsName, + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, "test-wal-file"); + cluster.stopRegionServer(0); + RegionServerThread rst = cluster.startRegionServer(); // we should still have this peer assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy")); // but at RS side, we should not have this peer loaded as replication source assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager() .getSources().isEmpty()); + UTIL.shutdownMiniHBaseCluster(); UTIL.restartHBaseCluster(1); // now we should have removed the peer @@ -81,5 +93,13 @@ public class TestStartupWithLegacyRegionReplicationEndpoint { // at rs side, we should not have the peer this time, not only for not having replication source assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService() .getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty()); + + // make sure that we can finish the SCP and delete the test-wal-file + UTIL.waitFor(15000, + () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p) + .allMatch(Procedure::isSuccess)); + assertTrue(UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage() + .getAllQueues(rsName).isEmpty()); } }