diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index e1d8b51d9c0..8e49137f39e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -31,6 +32,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -193,9 +194,9 @@ public class ReplicationPeerManager { } /** - * @return the old state, and whether the peer is enabled. + * @return the old desciption of the peer */ - Pair preTransitPeerSyncReplicationState(String peerId, + ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); SyncReplicationState fromState = desc.getSyncReplicationState(); @@ -204,7 +205,7 @@ public class ReplicationPeerManager { throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + " to " + state + " for peer id=" + peerId); } - return Pair.newPair(fromState, desc.isEnabled()); + return desc; } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -384,6 +385,16 @@ public class ReplicationPeerManager { "Only support replicated table config for sync replication peer"); } } + Path remoteWALDir = new Path(peerConfig.getRemoteWALDir()); + if (!remoteWALDir.isAbsolute()) { + throw new DoNotRetryIOException( + "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute"); + } + URI remoteWALDirUri = remoteWALDir.toUri(); + if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) { + throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() + + " is not qualified, you must provide scheme and authority"); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 01752962e18..ebe7a933d78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; @@ -31,9 +32,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,10 +114,20 @@ public class TransitPeerSyncReplicationStateProcedure if (cpHost != null) { cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); } - Pair pair = + ReplicationPeerDescription desc = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); - fromState = pair.getFirst(); - enabled = pair.getSecond(); + if (toState == SyncReplicationState.ACTIVE) { + Path remoteWALDirForPeer = + ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId); + // check whether the remote wal directory is present + if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()) + .exists(remoteWALDirForPeer)) { + throw new DoNotRetryIOException( + "The remote WAL directory " + remoteWALDirForPeer + " does not exist"); + } + } + fromState = desc.getSyncReplicationState(); + enabled = desc.isEnabled(); } private void postTransit(MasterProcedureEnv env) throws IOException { @@ -152,6 +163,36 @@ public class TransitPeerSyncReplicationStateProcedure } } + private void setNextStateAfterRefreshBegin() { + if (fromState.equals(SyncReplicationState.ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { + setNextState(toState.equals(SyncReplicationState.STANDBY) + ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + } else { + assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); + setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + } + } + + private void setNextStateAfterRefreshEnd() { + if (toState == SyncReplicationState.STANDBY) { + setNextState( + enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED + : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); + } else { + setNextState( + PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); + } + } + + private void replayRemoteWAL() { + addChildProcedure(new RecoverStandbyProcedure[] { new RecoverStandbyProcedure(peerId) }); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, PeerSyncReplicationStateTransitionState state) @@ -191,21 +232,10 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) .toArray(RefreshPeerProcedure[]::new)); - if (fromState.equals(SyncReplicationState.ACTIVE)) { - setNextState(toState.equals(SyncReplicationState.STANDBY) - ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER - : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); - } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { - setNextState(toState.equals(SyncReplicationState.STANDBY) - ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER - : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); - } else { - assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); - setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); - } + setNextStateAfterRefreshBegin(); return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: - addChildProcedure(new RecoverStandbyProcedure(peerId)); + replayRemoteWAL(); setNextState( PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; @@ -248,14 +278,7 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) .toArray(RefreshPeerProcedure[]::new)); - if (toState == SyncReplicationState.STANDBY) { - setNextState( - enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED - : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); - } else { - setNextState( - PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); - } + setNextStateAfterRefreshEnd(); return Flow.HAS_MORE_STATE; case SYNC_REPLICATION_SET_PEER_ENABLED: try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index ac98283bf73..c6ffeea1ad4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest; @@ -981,34 +982,37 @@ public class TestReplicationAdmin { ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); assertNull(rpc.getRemoteWALDir()); + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); try { - builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); - builder.setRemoteWALDir(rootDir); + builder.setRemoteWALDir("whatever"); try { hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } builder.setReplicateAllUserTables(false); + Set namespaces = new HashSet(); + namespaces.add("ns1"); + builder.setNamespaces(namespaces); try { - Set namespaces = new HashSet(); - namespaces.add("ns1"); - builder.setNamespaces(namespaces); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } builder.setNamespaces(null); @@ -1017,21 +1021,41 @@ public class TestReplicationAdmin { fail("Only support replicated table config for sync replication, and tables can't be empty"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, Arrays.asList("cf1")); + builder.setTableCFsMap(tableCfs); try { - tableCfs.put(tableName, Arrays.asList("cf1")); - builder.setTableCFsMap(tableCfs); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } tableCfs = new HashMap<>(); tableCfs.put(tableName, new ArrayList<>()); builder.setTableCFsMap(tableCfs); + try { + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("The remote WAL dir must be absolute"); + } catch (Exception e) { + // OK + LOG.info("Expected error:", e); + } + + builder.setRemoteWALDir("/hbase/remoteWALs"); + try { + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("The remote WAL dir must be qualified"); + } catch (Exception e) { + // OK + LOG.info("Expected error:", e); + } + + builder.setRemoteWALDir(rootDir); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); assertEquals(rootDir, rpc.getRemoteWALDir()); @@ -1042,6 +1066,7 @@ public class TestReplicationAdmin { fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } try { @@ -1050,6 +1075,7 @@ public class TestReplicationAdmin { fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } try { @@ -1062,6 +1088,7 @@ public class TestReplicationAdmin { "Change replicated table config on an existing synchronous peer is not allowed"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } } @@ -1079,13 +1106,13 @@ public class TestReplicationAdmin { try { hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.DOWNGRADE_ACTIVE); - fail("Can't transit cluster state if replication peer don't config remote wal dir"); + fail("Can't transit sync replication state if replication peer don't config remote wal dir"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); - TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND)); builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), @@ -1106,6 +1133,15 @@ public class TestReplicationAdmin { assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + fail("Can't transit sync replication state to ACTIVE if remote wal dir does not exist"); + } catch (Exception e) { + // OK + LOG.info("Expected error:", e); + } + TEST_UTIL.getTestFileSystem() + .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND)); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); assertEquals(SyncReplicationState.ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); @@ -1133,9 +1169,10 @@ public class TestReplicationAdmin { hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); try { hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); - fail("Can't transit cluster state from STANDBY to ACTIVE"); + fail("Can't transit sync replication state from STANDBY to ACTIVE"); } catch (Exception e) { // OK + LOG.info("Expected error:", e); } hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.DOWNGRADE_ACTIVE);