diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index cd051d23b83..4022195d1d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -59,8 +60,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase { public void setUp() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for ( JVMClusterUtil.RegionServerThread r : - utility1.getHBaseCluster().getRegionServerThreads()) { + for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + .getRegionServerThreads()) { utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } int rowCount = utility1.countRows(tableName); @@ -73,7 +74,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase { Scan scan = new Scan(); int lastCount = 0; for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { + if (i == NB_RETRIES - 1) { fail("Waited too much time for truncate"); } ResultScanner scanner = htable2.getScanner(scan); @@ -90,6 +91,12 @@ public class TestReplicationDroppedTables extends TestReplicationBase { break; } } + // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple + // batches. the default max request size is 256M, so all replication entries are in a batch, but + // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table + // may apply first, and then test_dropped table, and we will believe that the replication is not + // got stuck (HBASE-20475). + conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); } @Test @@ -164,15 +171,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { // put some data (lead with 0 so the edit gets sorted before the other table's edits // in the replication batch) // write a bunch of edits, making sure we fill a batch - byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); - Put put = new Put(rowkey); + byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); + Put put = new Put(rowKey); put.addColumn(familyname, row, row); lHtable1.put(put); - rowkey = Bytes.toBytes("normal put"); - put = new Put(rowkey); - put.addColumn(famName, row, row); - htable1.put(put); + for (int i = 0; i < 1000; i++) { + rowKey = Bytes.toBytes("NormalPut" + i); + put = new Put(rowKey).addColumn(famName, row, row); + htable1.put(put); + } try (Admin admin1 = connection1.getAdmin()) { admin1.disableTable(tablename); @@ -186,9 +194,9 @@ public class TestReplicationDroppedTables extends TestReplicationBase { admin.enablePeer("2"); if (allowProceeding) { // in this we'd expect the key to make it over - verifyReplicationProceeded(rowkey); + verifyReplicationProceeded(rowKey); } else { - verifyReplicationStuck(rowkey); + verifyReplicationStuck(rowKey); } // just to be safe conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); @@ -230,17 +238,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase { admin.disablePeer("2"); // put some data (lead with 0 so the edit gets sorted before the other table's edits - // in the replication batch) + // in the replication batch) // write a bunch of edits, making sure we fill a batch - byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); - Put put = new Put(rowkey); + byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); + Put put = new Put(rowKey); put.addColumn(familyname, row, row); lHtable1.put(put); - rowkey = Bytes.toBytes("normal put"); - put = new Put(rowkey); - put.addColumn(famName, row, row); - htable1.put(put); + for (int i = 0; i < 1000; i++) { + rowKey = Bytes.toBytes("NormalPut" + i); + put = new Put(rowKey).addColumn(famName, row, row); + htable1.put(put); + } try (Admin admin2 = connection2.getAdmin()) { admin2.disableTable(tablename); @@ -252,16 +261,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { try (Admin admin1 = connection1.getAdmin()) { // the source table still exists, replication should be stalled - verifyReplicationStuck(rowkey); + verifyReplicationStuck(rowKey); admin1.disableTable(tablename); // still stuck, source table still exists - verifyReplicationStuck(rowkey); + verifyReplicationStuck(rowKey); admin1.deleteTable(tablename); // now the source table is gone, replication should proceed, the // offending edits be dropped - verifyReplicationProceeded(rowkey); + verifyReplicationProceeded(rowKey); } // just to be safe conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);