From 00e54aae24d88ac9c2b84db5ea1eaf11c3e5d73c Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Tue, 6 Feb 2018 16:00:59 +0800 Subject: [PATCH] HBASE-19935 Only allow table replication for sync replication for now --- .../replication/ReplicationPeerConfig.java | 9 +++ .../replication/ReplicationPeerManager.java | 34 +++++++- .../replication/TestReplicationAdmin.java | 81 ++++++++++++------- .../wal/TestCombinedAsyncWriter.java | 6 ++ .../wal/TestSyncReplicationWALProvider.java | 6 ++ 5 files changed, 106 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 97abc74d1ac..997a155c655 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -220,6 +222,13 @@ public class ReplicationPeerConfig { return this.remoteWALDir; } + /** + * Use remote wal dir to decide whether a peer is sync replication peer + */ + public boolean isSyncReplication() { + return !StringUtils.isBlank(this.remoteWALDir); + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f07a0d816c2..ff778a8a5b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -170,7 +170,7 @@ public class ReplicationPeerManager { " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); } - if (oldPeerConfig.getRemoteWALDir() != null) { + if (oldPeerConfig.isSyncReplication()) { if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { throw new DoNotRetryIOException( "Changing the replicated namespace/table config on a synchronous replication " + @@ -199,8 +199,8 @@ public class ReplicationPeerManager { } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); SyncReplicationState syncReplicationState = - StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE - : SyncReplicationState.DOWNGRADE_ACTIVE; + copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE + : SyncReplicationState.NONE; peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); @@ -324,9 +324,37 @@ public class ReplicationPeerManager { peerConfig.getTableCFsMap()); } + if (peerConfig.isSyncReplication()) { + checkPeerConfigForSyncReplication(peerConfig); + } + checkConfiguredWALEntryFilters(peerConfig); } + private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + // This is used to reduce the difficulty for implementing the sync replication state transition + // as we need to reopen all the related regions. + // TODO: Add namespace, replicat_all flag back + if (peerConfig.replicateAllUserTables()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { + throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); + } + for (List cfs : peerConfig.getTableCFsMap().values()) { + if (cfs != null && !cfs.isEmpty()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + } + } + /** * Set a namespace in the peer config means that all tables in this namespace will be replicated * to the peer cluster. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a7710e7d466..d462dbd65e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -910,6 +911,8 @@ public class TestReplicationAdmin { @Test public void testPeerRemoteWALDir() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + String rootDir = "hdfs://srv1:9999/hbase"; ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); @@ -929,8 +932,47 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + try { + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); + } catch (Exception e) { + // OK + } + + builder.setReplicateAllUserTables(false); + try { + Set namespaces = new HashSet(); + namespaces.add("ns1"); + builder.setNamespaces(namespaces); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); + } catch (Exception e) { + // OK + } + + builder.setNamespaces(null); + try { + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication, and tables can't be empty"); + } catch (Exception e) { + // OK + } + + Map> tableCfs = new HashMap<>(); + try { + tableCfs.put(tableName, Arrays.asList("cf1")); + builder.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); + } catch (Exception e) { + // OK + } + + tableCfs = new HashMap<>(); + tableCfs.put(tableName, new ArrayList<>()); + builder.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); assertEquals(rootDir, rpc.getRemoteWALDir()); @@ -952,34 +994,12 @@ public class TestReplicationAdmin { try { builder = ReplicationPeerConfig.newBuilder(rpc); - builder.setReplicateAllUserTables(false); + tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new ArrayList<>()); + builder.setTableCFsMap(tableCfs); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); - } catch (Exception e) { - // OK - } - - try { - builder = ReplicationPeerConfig.newBuilder(rpc); - Set namespaces = new HashSet<>(); - namespaces.add("ns1"); - builder.setExcludeNamespaces(namespaces); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); - } catch (Exception e) { - // OK - } - - try { - builder = ReplicationPeerConfig.newBuilder(rpc); - Map> tableCfs = new HashMap<>(); - tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); - builder.setExcludeTableCFsMap(tableCfs); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + "Change replicated table config on an existing synchronous peer is not allowed"); } catch (Exception e) { // OK } @@ -987,8 +1007,11 @@ public class TestReplicationAdmin { @Test public void testTransitSyncReplicationPeerState() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); + builder.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); assertEquals(SyncReplicationState.NONE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); @@ -1005,6 +1028,10 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); + builder.setReplicateAllUserTables(false); + Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, new ArrayList<>()); + builder.setTableCFsMap(tableCfs); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index 36dbe0fc76c..07aa6a85f6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +53,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ RegionServerTests.class, MediumTests.class }) public class TestCombinedAsyncWriter { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static EventLoopGroup EVENT_LOOP_GROUP; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 60a9e13d504..f09e51ef907 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Optional; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; @@ -41,12 +42,17 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) public class TestSyncReplicationWALProvider { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static String PEER_ID = "1";