HBASE-27430 Should disable replication log cleaner when migrating replication queue data (#4901)
Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
parent
dfb125f3b2
commit
7c74f9e8c5
|
@ -724,11 +724,13 @@ message AssignReplicationQueuesStateData {
|
|||
}
|
||||
|
||||
enum MigrateReplicationQueueFromZkToTableState {
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
|
||||
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
|
||||
}
|
||||
|
||||
message MigrateReplicationQueueFromZkToTableStateData {
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
|
||||
|
@ -111,6 +113,26 @@ public class MigrateReplicationQueueFromZkToTableProcedure
|
|||
}
|
||||
}
|
||||
|
||||
private void disableReplicationLogCleaner(MasterProcedureEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
|
||||
// it is not likely that we can reach here as we will schedule this procedure immediately
|
||||
// after master restarting, where ReplicationLogCleaner should have not started its first run
|
||||
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
|
||||
// there will be no data in the new replication queue storage before we execute this procedure
|
||||
// so ReplicationLogCleaner will quit immediately without doing anything.
|
||||
throw suspend(env.getMasterConfiguration(),
|
||||
backoff -> LOG.info(
|
||||
"Can not disable replication log cleaner, sleep {} secs and retry later",
|
||||
backoff / 1000));
|
||||
}
|
||||
resetRetry();
|
||||
}
|
||||
|
||||
private void enableReplicationLogCleaner(MasterProcedureEnv env) {
|
||||
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
|
||||
}
|
||||
|
||||
private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
|
||||
long peerProcCount;
|
||||
try {
|
||||
|
@ -136,6 +158,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
|
|||
MigrateReplicationQueueFromZkToTableState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
switch (state) {
|
||||
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
|
||||
disableReplicationLogCleaner(env);
|
||||
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
|
||||
waitUntilNoPeerProcedure(env);
|
||||
List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
|
||||
|
@ -152,7 +178,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure
|
|||
"failed to delete old replication queue data, sleep {} secs and retry later",
|
||||
backoff / 1000, e));
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
// here we do not care the peers which have already been disabled, as later we do not need
|
||||
// to enable them
|
||||
|
@ -232,6 +259,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
|
|||
for (String peerId : disabledPeerIds) {
|
||||
addChildProcedure(new EnablePeerProcedure(peerId));
|
||||
}
|
||||
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
|
||||
enableReplicationLogCleaner(env);
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
|
@ -263,7 +294,19 @@ public class MigrateReplicationQueueFromZkToTableProcedure
|
|||
|
||||
@Override
|
||||
protected MigrateReplicationQueueFromZkToTableState getInitialState() {
|
||||
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
|
||||
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterReplay(MasterProcedureEnv env) {
|
||||
if (getCurrentState() == getInitialState()) {
|
||||
// do not need to disable log cleaner or acquire lock if we are in the initial state, later
|
||||
// when executing the procedure we will try to disable and acquire.
|
||||
return;
|
||||
}
|
||||
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
|
||||
throw new IllegalStateException("can not disable log cleaner, this should not happen");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -102,6 +106,8 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
|
|||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
// one hour, to make sure it will not run during the test
|
||||
UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
|
||||
UTIL.startMiniCluster(
|
||||
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
|
||||
}
|
||||
|
@ -193,8 +199,10 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
|
|||
UTIL.waitFor(30000, () -> proc.isSuccess());
|
||||
}
|
||||
|
||||
// make sure we will disable replication peers while migrating
|
||||
// and also tests disable/enable replication log cleaner and wait for region server upgrading
|
||||
@Test
|
||||
public void testDisablePeerAndWaitUpgrading() throws Exception {
|
||||
public void testDisablePeerAndWaitStates() throws Exception {
|
||||
String peerId = "2";
|
||||
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
|
||||
|
@ -206,11 +214,22 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
|
|||
EXTRA_REGION_SERVERS
|
||||
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
|
||||
|
||||
ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
|
||||
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
|
||||
assertTrue(barrier.start());
|
||||
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
MigrateReplicationQueueFromZkToTableProcedure proc =
|
||||
new MigrateReplicationQueueFromZkToTableProcedure();
|
||||
procExec.submitProcedure(proc);
|
||||
|
||||
Thread.sleep(5000);
|
||||
// make sure we are still waiting for replication log cleaner quit
|
||||
assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
|
||||
proc.getCurrentStateId());
|
||||
barrier.stop();
|
||||
|
||||
// wait until we reach the wait upgrading state
|
||||
UTIL.waitFor(30000,
|
||||
() -> proc.getCurrentStateId()
|
||||
|
@ -218,9 +237,17 @@ public class TestMigrateReplicationQueueFromZkToTableProcedure {
|
|||
&& proc.getState() == ProcedureState.WAITING_TIMEOUT);
|
||||
// make sure the peer is disabled for migrating
|
||||
assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
|
||||
// make sure the replication log cleaner is disabled
|
||||
assertFalse(barrier.start());
|
||||
|
||||
// the procedure should finish successfully
|
||||
EXTRA_REGION_SERVERS.clear();
|
||||
UTIL.waitFor(30000, () -> proc.isSuccess());
|
||||
|
||||
// make sure the peer is enabled again
|
||||
assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
|
||||
// make sure the replication log cleaner is enabled again
|
||||
assertTrue(barrier.start());
|
||||
barrier.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue