diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 32739b1b431..b6f2ee1e731 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -28,6 +28,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -227,8 +228,9 @@ public class ReplicationSource implements ReplicationSourceInterface { public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { String peerId = replicationPeer.getId(); + Set namespaces = replicationPeer.getNamespaces(); Map> tableCFMap = replicationPeer.getTableCFs(); - if (tableCFMap != null) { + if (tableCFMap != null) { // All peers with TableCFs List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { @@ -236,7 +238,15 @@ public class ReplicationSource implements ReplicationSourceInterface { metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); + tableName, Bytes.toString(family), peerId); + } + } else if (namespaces != null) { // Only for set NAMESPACES peers + if (namespaces.contains(tableName.getNamespaceAsString())) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); } } else { // user has explicitly not defined any table cfs for replication, means replicate all the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 6fd7288042c..063c70bc330 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -122,8 +123,8 @@ public class TestBulkLoadReplication extends TestReplicationBase { private static AtomicInteger BULK_LOADS_COUNT; private static CountDownLatch BULK_LOAD_LATCH; - private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); - private static final Configuration CONF3 = UTIL3.getConfiguration(); + protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); + protected static final Configuration CONF3 = UTIL3.getConfiguration(); private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); @@ -220,7 +221,7 @@ public class TestBulkLoadReplication extends TestReplicationBase { UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); } - private static void setupBulkLoadConfigsForCluster(Configuration config, + protected static void setupBulkLoadConfigsForCluster(Configuration config, String clusterReplicationId) throws Exception { config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); @@ -238,13 +239,16 @@ public class TestBulkLoadReplication extends TestReplicationBase { Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); byte[] row = Bytes.toBytes("001"); byte[] value = Bytes.toBytes("v1"); - assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable); + assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, + peer2TestTable, peer3TestTable); row = Bytes.toBytes("002"); value = Bytes.toBytes("v2"); - assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable); + assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, + peer2TestTable, peer3TestTable); row = Bytes.toBytes("003"); value = Bytes.toBytes("v3"); - assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable); + assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, + peer2TestTable, peer3TestTable); //Additional wait to make sure no extra bulk load happens Thread.sleep(400); //We have 3 bulk load events (1 initiated on each cluster). @@ -278,18 +282,18 @@ public class TestBulkLoadReplication extends TestReplicationBase { } - private void assertBulkLoadConditions(byte[] row, byte[] value, + protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, HBaseTestingUtility utility, Table...tables) throws Exception { BULK_LOAD_LATCH = new CountDownLatch(3); - bulkLoadOnCluster(row, value, utility); + bulkLoadOnCluster(tableName, row, value, utility); assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); assertTableHasValue(tables[0], row, value); assertTableHasValue(tables[1], row, value); assertTableHasValue(tables[2], row, value); } - private void bulkLoadOnCluster(byte[] row, byte[] value, - HBaseTestingUtility cluster) throws Exception { + protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, + HBaseTestingUtility cluster) throws Exception { String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); @@ -302,13 +306,19 @@ public class TestBulkLoadReplication extends TestReplicationBase { cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); } - private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { + protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { Get get = new Get(row); Result result = table.get(get); assertTrue(result.advance()); assertEquals(Bytes.toString(value), Bytes.toString(result.value())); } + protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { + Get get = new Get(row); + Result result = table.get(get); + assertTrue(result.isEmpty()); + } + private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) throws IOException { CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java new file mode 100644 index 00000000000..48790b33f96 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Testcase for HBASE-23098 + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class); + + private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility(); + private static final String PEER4_CLUSTER_ID = "peer4"; + private static final String PEER4_NS = "ns_peer1"; + private static final String PEER4_NS_TABLE = "ns_peer2"; + + private static final Configuration CONF4 = UTIL4.getConfiguration(); + + private static final String NS1 = "ns1"; + private static final String NS2 = "ns2"; + + private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup"); + private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID); + setupConfig(UTIL4, "/4"); + TestBulkLoadReplication.setUpBeforeClass(); + startFourthCluster(); + } + + private static void startFourthCluster() throws Exception { + LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3"); + UTIL4.setZkCluster(UTIL1.getZkCluster()); + UTIL4.startMiniCluster(NUM_SLAVES1); + + TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + Connection connection4 = ConnectionFactory.createConnection(CONF4); + try (Admin admin4 = connection4.getAdmin()) { + admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + UTIL4.waitUntilAllRegionsAssigned(tableName); + } + + @Before + @Override + public void setUpBase() throws Exception { + /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3 + * and this test add the fourth cluster. + * So we have following topology: + * 1 + * / \ + * 2 4 + * / + * 3 + * + * The 1 -> 4 has two peers, + * ns_peer1: ns1 -> ns1 (validate this peer hfile-refs) + * ns_peer1 configuration is NAMESPACES => ["ns1"] + * + * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is + * ns_peer2 configuration is NAMESPACES => ["ns2"], + * TABLE_CFS => { "ns2:t2_syncup" => []} + * + * The 1 -> 2 has one peer, this peer configuration is + * add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase" + * + */ + super.setUpBase(); + + // Create tables + TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + Admin admin1 = UTIL1.getAdmin(); + admin1.createNamespace(NamespaceDescriptor.create(NS1).build()); + admin1.createNamespace(NamespaceDescriptor.create(NS2).build()); + admin1.createTable(table1); + admin1.createTable(table2); + + Admin admin2 = UTIL2.getAdmin(); + admin2.createNamespace(NamespaceDescriptor.create(NS1).build()); + admin2.createNamespace(NamespaceDescriptor.create(NS2).build()); + admin2.createTable(table1); + admin2.createTable(table2); + + Admin admin3 = UTIL3.getAdmin(); + admin3.createNamespace(NamespaceDescriptor.create(NS1).build()); + admin3.createNamespace(NamespaceDescriptor.create(NS2).build()); + admin3.createTable(table1); + admin3.createTable(table2); + + Admin admin4 = UTIL4.getAdmin(); + admin4.createNamespace(NamespaceDescriptor.create(NS1).build()); + admin4.createNamespace(NamespaceDescriptor.create(NS2).build()); + admin4.createTable(table1); + admin4.createTable(table2); + + /** + * Set ns_peer1 1: ns1 -> 2: ns1 + * + * add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", + * NAMESPACES => ["ns1"] + */ + Set namespaces = new HashSet<>(); + namespaces.add(NS1); + ReplicationPeerConfig rpc4_ns = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) + .setReplicateAllUserTables(false).setNamespaces(namespaces).build(); + admin1.addReplicationPeer(PEER4_NS, rpc4_ns); + + /** + * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup + * + * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", + * NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] } + */ + Map> tableCFsMap = new HashMap<>(); + tableCFsMap.put(NS2_TABLE, null); + ReplicationPeerConfig rpc4_ns_table = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) + .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build(); + admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table); + } + + @After + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + Admin admin1 = UTIL1.getAdmin(); + admin1.disableTable(table1.getTableName()); + admin1.deleteTable(table1.getTableName()); + admin1.disableTable(table2.getTableName()); + admin1.deleteTable(table2.getTableName()); + admin1.deleteNamespace(NS1); + admin1.deleteNamespace(NS2); + + Admin admin2 = UTIL2.getAdmin(); + admin2.disableTable(table1.getTableName()); + admin2.deleteTable(table1.getTableName()); + admin2.disableTable(table2.getTableName()); + admin2.deleteTable(table2.getTableName()); + admin2.deleteNamespace(NS1); + admin2.deleteNamespace(NS2); + + Admin admin3 = UTIL3.getAdmin(); + admin3.disableTable(table1.getTableName()); + admin3.deleteTable(table1.getTableName()); + admin3.disableTable(table2.getTableName()); + admin3.deleteTable(table2.getTableName()); + admin3.deleteNamespace(NS1); + admin3.deleteNamespace(NS2); + + Admin admin4 = UTIL4.getAdmin(); + admin4.disableTable(table1.getTableName()); + admin4.deleteTable(table1.getTableName()); + admin4.disableTable(table2.getTableName()); + admin4.deleteTable(table2.getTableName()); + admin4.deleteNamespace(NS1); + admin4.deleteNamespace(NS2); + UTIL1.getAdmin().removeReplicationPeer(PEER4_NS); + UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE); + } + + @Test + @Override + public void testBulkLoadReplicationActiveActive() throws Exception { + Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); + Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); + Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName); + Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE); + Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE); + + // case1: The ns1 tables will be replicate to cluster4 + byte[] row = Bytes.toBytes("002_ns_peer"); + byte[] value = Bytes.toBytes("v2"); + bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1); + waitForReplication(ns1Table, 1, NB_RETRIES); + assertTableHasValue(ns1Table, row, value); + + // case2: The ns2:t2_syncup will be replicate to cluster4 + // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog + row = Bytes.toBytes("003_ns_table_peer"); + value = Bytes.toBytes("v2"); + bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1); + waitForReplication(ns2Table, 1, NB_RETRIES); + assertTableHasValue(ns2Table, row, value); + + // case3: The table test will be replicate to cluster1,cluster2,cluster3 + // not replicate to cluster4, because we not set other peer for that tables. + row = Bytes.toBytes("001_nopeer"); + value = Bytes.toBytes("v1"); + assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, + peer2TestTable, peer3TestTable); + assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty + + // Verify hfile-refs for 1:ns_peer1, expect is empty + MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster(); + ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null); + RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher); + ZKReplicationQueueStorage replicationQueueStorage = + new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration()); + Set hfiles = replicationQueueStorage.getAllHFileRefs(); + assertTrue(hfiles.isEmpty()); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index ad88aa44ebf..94150b8fda8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -138,6 +138,11 @@ public class TestReplicationBase { protected static void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException { + waitForReplication(htable2, expectedRows, retries); + } + + protected static void waitForReplication(Table htable2, int expectedRows, int retries) + throws IOException, InterruptedException { Scan scan; for (int i = 0; i < retries; i++) { scan = new Scan();