diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index b2c5aef650b..f96dbe5dc17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -243,7 +243,7 @@ public class TestReplicationBase { } @Before - public void setUpBase() throws IOException { + public void setUpBase() throws Exception { if (!peerExist(PEER_ID2)) { ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build(); @@ -252,7 +252,7 @@ public class TestReplicationBase { } @After - public void tearDownBase() throws IOException { + public void tearDownBase() throws Exception { if (peerExist(PEER_ID2)) { hbaseAdmin.removeReplicationPeer(PEER_ID2); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index 4022195d1d5..f280c7c193a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -17,29 +17,31 @@ */ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; +import java.io.IOException; + import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -55,15 +57,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase { HBaseClassTestRule.forClass(TestReplicationDroppedTables.class); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); + private static final int ROWS_COUNT = 1000; @Before - public void setUp() throws Exception { + public void setUpBase() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() .getRegionServerThreads()) { utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } + // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. + super.setUpBase(); int rowCount = utility1.countRows(tableName); utility1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster @@ -101,9 +106,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase { @Test public void testEditsStuckBehindDroppedTable() throws Exception { - // Sanity check - // Make sure by default edits for dropped tables stall the replication queue, even when the - // table(s) in question have been deleted on both ends. + // Sanity check Make sure by default edits for dropped tables stall the replication queue, even + // when the table(s) in question have been deleted on both ends. testEditsBehindDroppedTable(false, "test_dropped"); } @@ -134,6 +138,10 @@ public class TestReplicationDroppedTables extends TestReplicationBase { } } + private byte[] generateRowKey(int id) { + return Bytes.toBytes(String.format("NormalPut%03d", id)); + } + private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); @@ -144,13 +152,14 @@ public class TestReplicationDroppedTables extends TestReplicationBase { utility1.startMiniHBaseCluster(1, 1); TableName tablename = TableName.valueOf(tName); - byte[] familyname = Bytes.toBytes("fam"); + byte[] familyName = Bytes.toBytes("fam"); byte[] row = Bytes.toBytes("row"); - HTableDescriptor table = new HTableDescriptor(tablename); - HColumnDescriptor fam = new HColumnDescriptor(familyname); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - table.addFamily(fam); + TableDescriptor table = + TableDescriptorBuilder + .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); @@ -163,23 +172,25 @@ public class TestReplicationDroppedTables extends TestReplicationBase { utility1.waitUntilAllRegionsAssigned(tablename); utility2.waitUntilAllRegionsAssigned(tablename); - Table lHtable1 = utility1.getConnection().getTable(tablename); - // now suspend replication - admin.disablePeer("2"); + try (Admin admin1 = connection1.getAdmin()) { + admin1.disableReplicationPeer(PEER_ID2); + } // put some data (lead with 0 so the edit gets sorted before the other table's edits - // in the replication batch) - // write a bunch of edits, making sure we fill a batch - byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); - Put put = new Put(rowKey); - put.addColumn(familyname, row, row); - lHtable1.put(put); + // in the replication batch) write a bunch of edits, making sure we fill a batch + try (Table droppedTable = connection1.getTable(tablename)) { + byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); + Put put = new Put(rowKey); + put.addColumn(familyName, row, row); + droppedTable.put(put); + } - for (int i = 0; i < 1000; i++) { - rowKey = Bytes.toBytes("NormalPut" + i); - put = new Put(rowKey).addColumn(famName, row, row); - htable1.put(put); + try (Table table1 = connection1.getTable(tableName)) { + for (int i = 0; i < ROWS_COUNT; i++) { + Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); + table1.put(put); + } } try (Admin admin1 = connection1.getAdmin()) { @@ -191,12 +202,15 @@ public class TestReplicationDroppedTables extends TestReplicationBase { admin2.deleteTable(tablename); } - admin.enablePeer("2"); + try (Admin admin1 = connection1.getAdmin()) { + admin1.enableReplicationPeer(PEER_ID2); + } + if (allowProceeding) { // in this we'd expect the key to make it over - verifyReplicationProceeded(rowKey); + verifyReplicationProceeded(); } else { - verifyReplicationStuck(rowKey); + verifyReplicationStuck(); } // just to be safe conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); @@ -213,13 +227,14 @@ public class TestReplicationDroppedTables extends TestReplicationBase { utility1.startMiniHBaseCluster(1, 1); TableName tablename = TableName.valueOf("testdroppedtimed"); - byte[] familyname = Bytes.toBytes("fam"); + byte[] familyName = Bytes.toBytes("fam"); byte[] row = Bytes.toBytes("row"); - HTableDescriptor table = new HTableDescriptor(tablename); - HColumnDescriptor fam = new HColumnDescriptor(familyname); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - table.addFamily(fam); + TableDescriptor table = + TableDescriptorBuilder + .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); @@ -232,23 +247,25 @@ public class TestReplicationDroppedTables extends TestReplicationBase { utility1.waitUntilAllRegionsAssigned(tablename); utility2.waitUntilAllRegionsAssigned(tablename); - Table lHtable1 = utility1.getConnection().getTable(tablename); - // now suspend replication - admin.disablePeer("2"); + try (Admin admin1 = connection1.getAdmin()) { + admin1.disableReplicationPeer(PEER_ID2); + } // put some data (lead with 0 so the edit gets sorted before the other table's edits - // in the replication batch) - // write a bunch of edits, making sure we fill a batch - byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); - Put put = new Put(rowKey); - put.addColumn(familyname, row, row); - lHtable1.put(put); + // in the replication batch) write a bunch of edits, making sure we fill a batch + try (Table droppedTable = connection1.getTable(tablename)) { + byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); + Put put = new Put(rowKey); + put.addColumn(familyName, row, row); + droppedTable.put(put); + } - for (int i = 0; i < 1000; i++) { - rowKey = Bytes.toBytes("NormalPut" + i); - put = new Put(rowKey).addColumn(famName, row, row); - htable1.put(put); + try (Table table1 = connection1.getTable(tableName)) { + for (int i = 0; i < ROWS_COUNT; i++) { + Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); + table1.put(put); + } } try (Admin admin2 = connection2.getAdmin()) { @@ -256,48 +273,56 @@ public class TestReplicationDroppedTables extends TestReplicationBase { admin2.deleteTable(tablename); } - admin.enablePeer("2"); // edit should still be stuck - try (Admin admin1 = connection1.getAdmin()) { + // enable the replication peer. + admin1.enableReplicationPeer(PEER_ID2); // the source table still exists, replication should be stalled - verifyReplicationStuck(rowKey); + verifyReplicationStuck(); admin1.disableTable(tablename); // still stuck, source table still exists - verifyReplicationStuck(rowKey); + verifyReplicationStuck(); admin1.deleteTable(tablename); // now the source table is gone, replication should proceed, the // offending edits be dropped - verifyReplicationProceeded(rowKey); + verifyReplicationProceeded(); } // just to be safe conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); } - private void verifyReplicationProceeded(byte[] rowkey) throws Exception { - Get get = new Get(rowkey); + private boolean peerHasAllNormalRows() throws IOException { + try (ResultScanner scanner = htable2.getScanner(new Scan())) { + Result[] results = scanner.next(ROWS_COUNT); + if (results.length != ROWS_COUNT) { + return false; + } + for (int i = 0; i < results.length; i++) { + Assert.assertArrayEquals(generateRowKey(i), results[i].getRow()); + } + return true; + } + } + + private void verifyReplicationProceeded() throws Exception { for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { + if (i == NB_RETRIES - 1) { fail("Waited too much time for put replication"); } - Result res = htable2.get(get); - if (res.size() == 0) { + if (!peerHasAllNormalRows()) { LOG.info("Row not available"); Thread.sleep(SLEEP_TIME); } else { - assertArrayEquals(res.getRow(), rowkey); break; } } } - private void verifyReplicationStuck(byte[] rowkey) throws Exception { - Get get = new Get(rowkey); + private void verifyReplicationStuck() throws Exception { for (int i = 0; i < NB_RETRIES; i++) { - Result res = htable2.get(get); - if (res.size() >= 1) { + if (peerHasAllNormalRows()) { fail("Edit should have been stuck behind dropped tables"); } else { LOG.info("Row not replicated, let's wait a bit more..."); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 3d2cdfbfbf0..745c4391681 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -102,7 +101,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } @After - public void tearDownBase() throws IOException { + public void tearDownBase() throws Exception { // Do nothing, just replace the super tearDown. because the super tearDown will use the // out-of-data HBase admin to remove replication peer, which will be result in failure. }