diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index b6f5d7e50bb..14d07c17c88 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -724,11 +724,13 @@ message AssignReplicationQueuesStateData { } enum MigrateReplicationQueueFromZkToTableState { - MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1; - MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2; - MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3; - MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4; - MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7; } message MigrateReplicationQueueFromZkToTableStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 93ff27db3f7..b7c4e33ef85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.master.replication; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; @@ -111,6 +113,26 @@ public class MigrateReplicationQueueFromZkToTableProcedure } } + private void disableReplicationLogCleaner(MasterProcedureEnv env) + throws ProcedureSuspendedException { + if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + // it is not likely that we can reach here as we will schedule this procedure immediately + // after master restarting, where ReplicationLogCleaner should have not started its first run + // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since + // there will be no data in the new replication queue storage before we execute this procedure + // so ReplicationLogCleaner will quit immediately without doing anything. + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.info( + "Can not disable replication log cleaner, sleep {} secs and retry later", + backoff / 1000)); + } + resetRetry(); + } + + private void enableReplicationLogCleaner(MasterProcedureEnv env) { + env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable(); + } + private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException { long peerProcCount; try { @@ -136,6 +158,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure MigrateReplicationQueueFromZkToTableState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { switch (state) { + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER: + disableReplicationLogCleaner(env); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE); + return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE: waitUntilNoPeerProcedure(env); List peers = env.getReplicationPeerManager().listPeers(null); @@ -152,7 +178,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure "failed to delete old replication queue data, sleep {} secs and retry later", backoff / 1000, e)); } - return Flow.NO_MORE_STATE; + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); + return Flow.HAS_MORE_STATE; } // here we do not care the peers which have already been disabled, as later we do not need // to enable them @@ -232,6 +259,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure for (String peerId : disabledPeerIds) { addChildProcedure(new EnablePeerProcedure(peerId)); } + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); + return Flow.HAS_MORE_STATE; + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER: + enableReplicationLogCleaner(env); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -263,7 +294,19 @@ public class MigrateReplicationQueueFromZkToTableProcedure @Override protected MigrateReplicationQueueFromZkToTableState getInitialState() { - return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; + return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if (getCurrentState() == getInitialState()) { + // do not need to disable log cleaner or acquire lock if we are in the initial state, later + // when executing the procedure we will try to disable and acquire. + return; + } + if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + throw new IllegalStateException("can not disable log cleaner, this should not happen"); + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java index 752abc380b8..cb795edcd62 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.master.replication; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -102,6 +106,8 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure { @BeforeClass public static void setupCluster() throws Exception { + // one hour, to make sure it will not run during the test + UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000); UTIL.startMiniCluster( StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build()); } @@ -193,8 +199,10 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure { UTIL.waitFor(30000, () -> proc.isSuccess()); } + // make sure we will disable replication peers while migrating + // and also tests disable/enable replication log cleaner and wait for region server upgrading @Test - public void testDisablePeerAndWaitUpgrading() throws Exception { + public void testDisablePeerAndWaitStates() throws Exception { String peerId = "2"; ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") @@ -206,11 +214,22 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure { EXTRA_REGION_SERVERS .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics); + ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster() + .getReplicationPeerManager().getReplicationLogCleanerBarrier(); + assertTrue(barrier.start()); + ProcedureExecutor procExec = getMasterProcedureExecutor(); MigrateReplicationQueueFromZkToTableProcedure proc = new MigrateReplicationQueueFromZkToTableProcedure(); procExec.submitProcedure(proc); + + Thread.sleep(5000); + // make sure we are still waiting for replication log cleaner quit + assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(), + proc.getCurrentStateId()); + barrier.stop(); + // wait until we reach the wait upgrading state UTIL.waitFor(30000, () -> proc.getCurrentStateId() @@ -218,9 +237,17 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure { && proc.getState() == ProcedureState.WAITING_TIMEOUT); // make sure the peer is disabled for migrating assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); + // make sure the replication log cleaner is disabled + assertFalse(barrier.start()); // the procedure should finish successfully EXTRA_REGION_SERVERS.clear(); UTIL.waitFor(30000, () -> proc.isSuccess()); + + // make sure the peer is enabled again + assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); + // make sure the replication log cleaner is enabled again + assertTrue(barrier.start()); + barrier.stop(); } }