HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test.

This commit is contained in:
huzheng 2018-04-25 10:56:56 +08:00
parent a8be3bb814
commit 12c45cb2e8
1 changed files with 30 additions and 21 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -59,8 +60,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
public void setUp() throws Exception { public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs, // Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue // rolling like this makes sure the most recent one gets added to the queue
for ( JVMClusterUtil.RegionServerThread r : for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
utility1.getHBaseCluster().getRegionServerThreads()) { .getRegionServerThreads()) {
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
} }
int rowCount = utility1.countRows(tableName); int rowCount = utility1.countRows(tableName);
@ -73,7 +74,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
Scan scan = new Scan(); Scan scan = new Scan();
int lastCount = 0; int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) { if (i == NB_RETRIES - 1) {
fail("Waited too much time for truncate"); fail("Waited too much time for truncate");
} }
ResultScanner scanner = htable2.getScanner(scan); ResultScanner scanner = htable2.getScanner(scan);
@ -90,6 +91,12 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
break; break;
} }
} }
// Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
// batches. the default max request size is 256M, so all replication entries are in a batch, but
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
// may apply first, and then test_dropped table, and we will believe that the replication is not
// got stuck (HBASE-20475).
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
} }
@Test @Test
@ -164,15 +171,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// put some data (lead with 0 so the edit gets sorted before the other table's edits // put some data (lead with 0 so the edit gets sorted before the other table's edits
// in the replication batch) // in the replication batch)
// write a bunch of edits, making sure we fill a batch // write a bunch of edits, making sure we fill a batch
byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowkey); Put put = new Put(rowKey);
put.addColumn(familyname, row, row); put.addColumn(familyname, row, row);
lHtable1.put(put); lHtable1.put(put);
rowkey = Bytes.toBytes("normal put"); for (int i = 0; i < 1000; i++) {
put = new Put(rowkey); rowKey = Bytes.toBytes("NormalPut" + i);
put.addColumn(famName, row, row); put = new Put(rowKey).addColumn(famName, row, row);
htable1.put(put); htable1.put(put);
}
try (Admin admin1 = connection1.getAdmin()) { try (Admin admin1 = connection1.getAdmin()) {
admin1.disableTable(tablename); admin1.disableTable(tablename);
@ -186,9 +194,9 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
admin.enablePeer("2"); admin.enablePeer("2");
if (allowProceeding) { if (allowProceeding) {
// in this we'd expect the key to make it over // in this we'd expect the key to make it over
verifyReplicationProceeded(rowkey); verifyReplicationProceeded(rowKey);
} else { } else {
verifyReplicationStuck(rowkey); verifyReplicationStuck(rowKey);
} }
// just to be safe // just to be safe
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
@ -232,15 +240,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// put some data (lead with 0 so the edit gets sorted before the other table's edits // put some data (lead with 0 so the edit gets sorted before the other table's edits
// in the replication batch) // in the replication batch)
// write a bunch of edits, making sure we fill a batch // write a bunch of edits, making sure we fill a batch
byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowkey); Put put = new Put(rowKey);
put.addColumn(familyname, row, row); put.addColumn(familyname, row, row);
lHtable1.put(put); lHtable1.put(put);
rowkey = Bytes.toBytes("normal put"); for (int i = 0; i < 1000; i++) {
put = new Put(rowkey); rowKey = Bytes.toBytes("NormalPut" + i);
put.addColumn(famName, row, row); put = new Put(rowKey).addColumn(famName, row, row);
htable1.put(put); htable1.put(put);
}
try (Admin admin2 = connection2.getAdmin()) { try (Admin admin2 = connection2.getAdmin()) {
admin2.disableTable(tablename); admin2.disableTable(tablename);
@ -252,16 +261,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
try (Admin admin1 = connection1.getAdmin()) { try (Admin admin1 = connection1.getAdmin()) {
// the source table still exists, replication should be stalled // the source table still exists, replication should be stalled
verifyReplicationStuck(rowkey); verifyReplicationStuck(rowKey);
admin1.disableTable(tablename); admin1.disableTable(tablename);
// still stuck, source table still exists // still stuck, source table still exists
verifyReplicationStuck(rowkey); verifyReplicationStuck(rowKey);
admin1.deleteTable(tablename); admin1.deleteTable(tablename);
// now the source table is gone, replication should proceed, the // now the source table is gone, replication should proceed, the
// offending edits be dropped // offending edits be dropped
verifyReplicationProceeded(rowkey); verifyReplicationProceeded(rowKey);
} }
// just to be safe // just to be safe
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);