From 26037854adc980483d1a094ad3d7fa14b394910e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 3 Jun 2019 20:47:32 +0800 Subject: [PATCH] HBASE-22524 Refactor TestReplicationSyncUpTool --- .../replication/TestVerifyReplication.java | 128 ++++--- .../TestReplicationAdminWithClusters.java | 14 +- .../replication/TestNamespaceReplication.java | 4 +- .../replication/TestReplicationBase.java | 121 +++---- ...tReplicationChangingPeerRegionservers.java | 10 +- .../TestReplicationDisableInactivePeer.java | 4 +- .../TestReplicationDroppedTables.java | 50 +-- .../TestReplicationEmptyWALRecovery.java | 22 +- .../replication/TestReplicationEndpoint.java | 95 ++--- .../TestReplicationKillMasterRS.java | 2 +- ...TestReplicationKillMasterRSCompressed.java | 2 +- ...cationKillMasterRSWithSeparateOldWALs.java | 4 +- .../replication/TestReplicationKillRS.java | 6 +- .../TestReplicationKillSlaveRS.java | 2 +- ...icationKillSlaveRSWithSeparateOldWALs.java | 4 +- .../TestReplicationMetricsforUI.java | 6 +- .../TestReplicationSmallTests.java | 12 +- .../replication/TestReplicationStatus.java | 8 +- .../TestReplicationStatusAfterLagging.java | 10 +- ...ionStatusBothNormalAndRecoveryLagging.java | 8 +- ...StatusSourceStartedTargetStoppedNewOp.java | 8 +- ...StatusSourceStartedTargetStoppedNoOps.java | 8 +- ...ourceStartedTargetStoppedWithRecovery.java | 8 +- .../TestReplicationSyncUpTool.java | 325 +++++------------- .../TestReplicationSyncUpToolBase.java | 141 ++++++++ ...plicationSyncUpToolWithBulkLoadedData.java | 135 ++++---- ...plicationEndpointWithMultipleAsyncWAL.java | 4 +- ...estReplicationEndpointWithMultipleWAL.java | 4 +- ...asterRSCompressedWithMultipleAsyncWAL.java | 4 +- ...KillMasterRSCompressedWithMultipleWAL.java | 4 +- ...icationSyncUpToolWithMultipleAsyncWAL.java | 12 +- ...tReplicationSyncUpToolWithMultipleWAL.java | 14 +- .../regionserver/TestReplicator.java | 22 +- 33 files changed, 589 insertions(+), 612 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java rename {hbase-endpoint => hbase-server}/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java (62%) diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index 7d12cbcc6b0..7771c0a4299 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -27,8 +27,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.TreeMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -92,7 +90,7 @@ public class TestVerifyReplication extends TestReplicationBase { @Before public void setUp() throws Exception { cleanUp(); - utility2.deleteTableData(peerTableName); + UTIL2.deleteTableData(peerTableName); } @BeforeClass @@ -103,7 +101,7 @@ public class TestVerifyReplication extends TestReplicationBase { ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100) .build()).build(); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } @@ -112,7 +110,7 @@ public class TestVerifyReplication extends TestReplicationBase { private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { - Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args); + Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); if (job == null) { fail("Job wasn't created, see the log"); } @@ -174,24 +172,20 @@ public class TestVerifyReplication extends TestReplicationBase { .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); - scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { - scopes.put(f.getName(), f.getScope()); - } - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } - utility1.waitUntilAllRegionsAssigned(tableName); - utility2.waitUntilAllRegionsAssigned(tableName); + UTIL1.waitUntilAllRegionsAssigned(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); - lHtable1 = utility1.getConnection().getTable(tableName); - lHtable2 = utility2.getConnection().getTable(tableName); + lHtable1 = UTIL1.getConnection().getTable(tableName); + lHtable2 = UTIL2.getConnection().getTable(tableName); Put put = new Put(row); put.addColumn(familyname, row, row); @@ -442,30 +436,30 @@ public class TestVerifyReplication extends TestReplicationBase { runSmallBatchTest(); // Take source and target tables snapshot - Path rootDir = FSUtils.getRootDir(conf1); - FileSystem fs = rootDir.getFileSystem(conf1); + Path rootDir = FSUtils.getRootDir(CONF1); + FileSystem fs = rootDir.getFileSystem(CONF1); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); // Take target snapshot - Path peerRootDir = FSUtils.getRootDir(conf2); - FileSystem peerFs = peerRootDir.getFileSystem(conf2); + Path peerRootDir = FSUtils.getRootDir(CONF2); + FileSystem peerFs = peerRootDir.getFileSystem(CONF2); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); String peerFSAddress = peerFs.getUri().toString(); - String temPath1 = utility1.getRandomDir().toString(); + String temPath1 = UTIL1.getRandomDir().toString(); String temPath2 = "/tmp" + System.currentTimeMillis(); String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, - "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, - "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; + "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); - checkRestoreTmpDir(conf1, temPath1, 1); - checkRestoreTmpDir(conf2, temPath2, 1); + checkRestoreTmpDir(CONF1, temPath1, 1); + checkRestoreTmpDir(CONF2, temPath2, 1); Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); @@ -481,20 +475,20 @@ public class TestVerifyReplication extends TestReplicationBase { htable2.delete(delete); sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, - "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, - "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; + "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() }; runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); - checkRestoreTmpDir(conf1, temPath1, 2); - checkRestoreTmpDir(conf2, temPath2, 2); + checkRestoreTmpDir(CONF1, temPath1, 2); + checkRestoreTmpDir(CONF2, temPath2, 2); } @Test @@ -504,7 +498,7 @@ public class TestVerifyReplication extends TestReplicationBase { runSmallBatchTest(); // with a quorum address (a cluster key) - String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() }; + String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); Scan scan = new Scan(); @@ -529,31 +523,31 @@ public class TestVerifyReplication extends TestReplicationBase { runSmallBatchTest(); // Take source and target tables snapshot - Path rootDir = FSUtils.getRootDir(conf1); - FileSystem fs = rootDir.getFileSystem(conf1); + Path rootDir = FSUtils.getRootDir(CONF1); + FileSystem fs = rootDir.getFileSystem(CONF1); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); // Take target snapshot - Path peerRootDir = FSUtils.getRootDir(conf2); - FileSystem peerFs = peerRootDir.getFileSystem(conf2); + Path peerRootDir = FSUtils.getRootDir(CONF2); + FileSystem peerFs = peerRootDir.getFileSystem(CONF2); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); String peerFSAddress = peerFs.getUri().toString(); - String tmpPath1 = utility1.getRandomDir().toString(); + String tmpPath1 = UTIL1.getRandomDir().toString(); String tmpPath2 = "/tmp" + System.currentTimeMillis(); String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); - checkRestoreTmpDir(conf1, tmpPath1, 1); - checkRestoreTmpDir(conf2, tmpPath2, 1); + checkRestoreTmpDir(CONF1, tmpPath1, 1); + checkRestoreTmpDir(CONF2, tmpPath2, 1); Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); @@ -569,21 +563,21 @@ public class TestVerifyReplication extends TestReplicationBase { htable2.delete(delete); sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); - checkRestoreTmpDir(conf1, tmpPath1, 2); - checkRestoreTmpDir(conf2, tmpPath2, 2); + checkRestoreTmpDir(CONF1, tmpPath1, 2); + checkRestoreTmpDir(CONF2, tmpPath2, 2); } private static void runBatchCopyTest() throws Exception { @@ -621,10 +615,10 @@ public class TestVerifyReplication extends TestReplicationBase { // with a peerTableName along with quorum address (a cluster key) String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), - utility2.getClusterKey(), tableName.getNameAsString() }; + UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); - utility2.deleteTableData(peerTableName); + UTIL2.deleteTableData(peerTableName); runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); } @@ -634,32 +628,32 @@ public class TestVerifyReplication extends TestReplicationBase { runBatchCopyTest(); // Take source and target tables snapshot - Path rootDir = FSUtils.getRootDir(conf1); - FileSystem fs = rootDir.getFileSystem(conf1); + Path rootDir = FSUtils.getRootDir(CONF1); + FileSystem fs = rootDir.getFileSystem(CONF1); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); // Take target snapshot - Path peerRootDir = FSUtils.getRootDir(conf2); - FileSystem peerFs = peerRootDir.getFileSystem(conf2); + Path peerRootDir = FSUtils.getRootDir(CONF2); + FileSystem peerFs = peerRootDir.getFileSystem(CONF2); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); String peerFSAddress = peerFs.getUri().toString(); - String tmpPath1 = utility1.getRandomDir().toString(); + String tmpPath1 = UTIL1.getRandomDir().toString(); String tmpPath2 = "/tmp" + System.currentTimeMillis(); String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); - checkRestoreTmpDir(conf1, tmpPath1, 1); - checkRestoreTmpDir(conf2, tmpPath2, 1); + checkRestoreTmpDir(CONF1, tmpPath1, 1); + checkRestoreTmpDir(CONF2, tmpPath2, 1); Scan scan = new Scan(); ResultScanner rs = htable3.getScanner(scan); @@ -675,22 +669,22 @@ public class TestVerifyReplication extends TestReplicationBase { htable3.delete(delete); sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, + SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), tableName.getNameAsString() }; runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); - checkRestoreTmpDir(conf1, tmpPath1, 2); - checkRestoreTmpDir(conf2, tmpPath2, 2); + checkRestoreTmpDir(CONF1, tmpPath1, 2); + checkRestoreTmpDir(CONF2, tmpPath2, 2); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 46536c10f86..16fef673c54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -74,11 +74,11 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - connection1 = ConnectionFactory.createConnection(conf1); - connection2 = ConnectionFactory.createConnection(conf2); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); - adminExt = new ReplicationAdmin(conf1); + adminExt = new ReplicationAdmin(CONF1); } @AfterClass @@ -199,8 +199,8 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); } } finally { - utility1.deleteTable(tn); - utility2.deleteTable(tn); + UTIL1.deleteTable(tn); + UTIL2.deleteTable(tn); } } @@ -273,7 +273,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { public void testReplicationPeerConfigUpdateCallback() throws Exception { String peerId = "1"; ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); + rpc.setClusterKey(UTIL2.getClusterKey()); rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); rpc.getConfiguration().put("key1", "value1"); @@ -325,7 +325,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Override public UUID getPeerUUID() { - return utility1.getRandomUUID(); + return UTIL1.getRandomUUID(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java index d8a02c7b44b..7dcdf8ccee0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java @@ -101,8 +101,8 @@ public class TestNamespaceReplication extends TestReplicationBase { public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - connection1 = ConnectionFactory.createConnection(conf1); - connection2 = ConnectionFactory.createConnection(conf2); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 3091ef3bce4..219f9b4eac6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -24,18 +24,14 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -52,7 +48,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -72,22 +67,19 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; public class TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); - protected static Configuration conf1 = HBaseConfiguration.create(); - protected static Configuration conf2; protected static Configuration CONF_WITH_LOCALFS; - protected static ZKWatcher zkw1; - protected static ZKWatcher zkw2; - protected static ReplicationAdmin admin; protected static Admin hbaseAdmin; protected static Table htable1; protected static Table htable2; - protected static NavigableMap scopes; - protected static HBaseTestingUtility utility1; - protected static HBaseTestingUtility utility2; + protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + protected static final Configuration CONF1 = UTIL1.getConfiguration(); + protected static final Configuration CONF2 = UTIL2.getConfiguration(); + protected static final int NUM_SLAVES1 = 2; protected static final int NUM_SLAVES2 = 4; protected static final int NB_ROWS_IN_BATCH = 100; @@ -113,12 +105,12 @@ public class TestReplicationBase { protected final void cleanUp() throws IOException, InterruptedException { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } - int rowCount = utility1.countRows(tableName); - utility1.deleteTableData(tableName); + int rowCount = UTIL1.countRows(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on // utility2 since late writes could make it to the slave in some way. @@ -180,92 +172,85 @@ public class TestReplicationBase { htable1.put(puts); } - protected static void configureClusters(){ - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + private static void setupConfig(HBaseTestingUtility util, String znodeParent) { + Configuration conf = util.getConfiguration(); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // sufficient number of events. But we don't want to go too low because // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want // more than one batch sent to the peer cluster for better testing. - conf1.setInt("replication.source.size.capacity", 102400); - conf1.setLong("replication.source.sleepforretries", 100); - conf1.setInt("hbase.regionserver.maxlogs", 10); - conf1.setLong("hbase.master.logcleaner.ttl", 10); - conf1.setInt("zookeeper.recovery.retry", 1); - conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); - conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf1.setInt("replication.stats.thread.period.seconds", 5); - conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); - conf1.setLong("replication.sleep.before.failover", 2000); - conf1.setInt("replication.source.maxretriesmultiplier", 10); - conf1.setFloat("replication.source.ratio", 1.0f); - conf1.setBoolean("replication.source.eof.autorecovery", true); - conf1.setLong("hbase.serial.replication.waiting.ms", 100); + conf.setInt("replication.source.size.capacity", 102400); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + conf.setFloat("replication.source.ratio", 1.0f); + conf.setBoolean("replication.source.eof.autorecovery", true); + conf.setLong("hbase.serial.replication.waiting.ms", 100); + } - utility1 = new HBaseTestingUtility(conf1); + static void configureClusters(HBaseTestingUtility util1, + HBaseTestingUtility util2) { + setupConfig(util1, "/1"); + setupConfig(util2, "/2"); - // Base conf2 on conf1 so it gets the right zk cluster. - conf2 = HBaseConfiguration.create(conf1); + Configuration conf2 = util2.getConfiguration(); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); - - utility2 = new HBaseTestingUtility(conf2); } protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves) throws Exception { util.shutdownMiniHBaseCluster(); - util - .startMiniHBaseCluster(StartMiniClusterOption.builder().numRegionServers(numSlaves).build()); + util.restartHBaseCluster(numSlaves); } - protected static void startClusters() throws Exception{ - utility1.startMiniZKCluster(); - MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - // Have to reget conf1 in case zk cluster location different - // than default - conf1 = utility1.getConfiguration(); - zkw1 = new ZKWatcher(conf1, "cluster1", null, true); - admin = new ReplicationAdmin(conf1); + protected static void startClusters() throws Exception { + UTIL1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); + admin = new ReplicationAdmin(CONF1); LOG.info("Setup first Zk"); - utility2.setZkCluster(miniZK); - zkw2 = new ZKWatcher(conf2, "cluster2", null, true); + UTIL2.setZkCluster(miniZK); LOG.info("Setup second Zk"); - CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); - utility1.startMiniCluster(NUM_SLAVES1); + CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); + UTIL1.startMiniCluster(NUM_SLAVES1); // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks // as a component in deciding maximum number of parallel batches to send to the peer cluster. - utility2.startMiniCluster(NUM_SLAVES2); + UTIL2.startMiniCluster(NUM_SLAVES2); - hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); + hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); - scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { - scopes.put(f.getName(), f.getScope()); - } - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } - utility1.waitUntilAllRegionsAssigned(tableName); - utility2.waitUntilAllRegionsAssigned(tableName); + UTIL1.waitUntilAllRegionsAssigned(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); htable1 = connection1.getTable(tableName); htable2 = connection2.getTable(tableName); } @BeforeClass public static void setUpBeforeClass() throws Exception { - configureClusters(); + configureClusters(UTIL1, UTIL2); startClusters(); } @@ -277,9 +262,9 @@ public class TestReplicationBase { public void setUpBase() throws Exception { if (!peerExist(PEER_ID2)) { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()); + .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()); if (isSyncPeer()) { - FileSystem fs2 = utility2.getTestFileSystem(); + FileSystem fs2 = UTIL2.getTestFileSystem(); // The remote wal dir is not important as we do not use it in DA state, here we only need to // confirm that a sync peer in DA state can still replicate data to remote cluster // asynchronously. @@ -303,7 +288,7 @@ public class TestReplicationBase { Put put = new Put(row); put.addColumn(famName, row, row); - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); htable1.put(put); Get get = new Get(row); @@ -358,7 +343,7 @@ public class TestReplicationBase { htable2.close(); htable1.close(); admin.close(); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + UTIL1.shutdownMiniCluster(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index 5c967422848..3eb58a4a663 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -88,11 +88,11 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas public void setUp() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } - utility1.deleteTableData(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on // utility2 since late writes could make it to the slave in some way. @@ -123,7 +123,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas @Test public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { LOG.info("testSimplePutDelete"); - MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); int numRS = peerCluster.getRegionServerThreads().size(); doPutTest(Bytes.toBytes(1)); @@ -150,7 +150,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas put.addColumn(famName, row, row); if (htable1 == null) { - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); } htable1.put(put); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index dbb5164e690..4c034b1f671 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -54,7 +54,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { */ @Test public void testDisableInactivePeer() throws Exception { - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); byte[] rowkey = Bytes.toBytes("disable inactive peer"); Put put = new Put(rowkey); @@ -67,7 +67,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { // disable and start the peer admin.disablePeer("2"); StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); - utility2.startMiniHBaseCluster(option); + UTIL2.startMiniHBaseCluster(option); Get get = new Get(rowkey); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index 31750a8bf9b..1d391d3df3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -63,14 +63,14 @@ public class TestReplicationDroppedTables extends TestReplicationBase { public void setUpBase() throws Exception { // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue - for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() .getRegionServerThreads()) { - utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); } // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. super.setUpBase(); - int rowCount = utility1.countRows(tableName); - utility1.deleteTableData(tableName); + int rowCount = UTIL1.countRows(tableName); + UTIL1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on // utility2 since late writes could make it to the slave in some way. @@ -101,7 +101,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase { // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table // may apply first, and then test_dropped table, and we will believe that the replication is not // got stuck (HBASE-20475). - conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); + CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); } @Test @@ -121,11 +121,11 @@ public class TestReplicationDroppedTables extends TestReplicationBase { @Test public void testEditsDroppedWithDroppedTableNS() throws Exception { // also try with a namespace - Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection1 = ConnectionFactory.createConnection(CONF1); try (Admin admin1 = connection1.getAdmin()) { admin1.createNamespace(NamespaceDescriptor.create("NS").build()); } - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin2 = connection2.getAdmin()) { admin2.createNamespace(NamespaceDescriptor.create("NS").build()); } @@ -143,13 +143,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase { } private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); - conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); + CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); // make sure we have a single region server only, so that all // edits for all tables go there - utility1.shutdownMiniHBaseCluster(); - utility1.startMiniHBaseCluster(); + UTIL1.shutdownMiniHBaseCluster(); + UTIL1.startMiniHBaseCluster(); TableName tablename = TableName.valueOf(tName); byte[] familyName = Bytes.toBytes("fam"); @@ -161,16 +161,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(); - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table); } - utility1.waitUntilAllRegionsAssigned(tablename); - utility2.waitUntilAllRegionsAssigned(tablename); + UTIL1.waitUntilAllRegionsAssigned(tablename); + UTIL2.waitUntilAllRegionsAssigned(tablename); // now suspend replication try (Admin admin1 = connection1.getAdmin()) { @@ -213,18 +213,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase { verifyReplicationStuck(); } // just to be safe - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); } @Test public void testEditsBehindDroppedTableTiming() throws Exception { - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); - conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); + CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); // make sure we have a single region server only, so that all // edits for all tables go there - utility1.shutdownMiniHBaseCluster(); - utility1.startMiniHBaseCluster(); + UTIL1.shutdownMiniHBaseCluster(); + UTIL1.startMiniHBaseCluster(); TableName tablename = TableName.valueOf("testdroppedtimed"); byte[] familyName = Bytes.toBytes("fam"); @@ -236,16 +236,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase { .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(); - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection1 = ConnectionFactory.createConnection(CONF1); + Connection connection2 = ConnectionFactory.createConnection(CONF2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table); } - utility1.waitUntilAllRegionsAssigned(tablename); - utility2.waitUntilAllRegionsAssigned(tablename); + UTIL1.waitUntilAllRegionsAssigned(tablename); + UTIL2.waitUntilAllRegionsAssigned(tablename); // now suspend replication try (Admin admin1 = connection1.getAdmin()) { @@ -290,7 +290,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase { verifyReplicationProceeded(); } // just to be safe - conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); } private boolean peerHasAllNormalRows() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe4149c5..c0f22a9ac12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -55,16 +55,16 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { - Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 10000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { - HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); - Replication replicationService = (Replication) utility1.getHBaseCluster() + Replication replicationService = (Replication) UTIL1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { @@ -81,19 +81,19 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { @Test public void testEmptyWALRecovery() throws Exception { - final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); // for each RS, create an empty wal with same walGroupId final List emptyWalPaths = new ArrayList<>(); long ts = System.currentTimeMillis(); for (int i = 0; i < numRs; i++) { RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); - WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); - Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); - utility1.getTestFileSystem().create(emptyWalPath).close(); + Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); + UTIL1.getTestFileSystem().create(emptyWalPath).close(); emptyWalPaths.add(emptyWalPath); } @@ -102,12 +102,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { // determine if the file being replicated currently is still opened for write, so just inject a // new wal to the replication queue does not mean the previous file is closed. for (int i = 0; i < numRs; i++) { - HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); wal.rollWriter(true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 03fbb59f26d..b909d8f1bd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -83,7 +83,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); + numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); } @AfterClass @@ -101,12 +101,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointForTest.lastEntries = null; final List rsThreads = - utility1.getMiniHBaseCluster().getRegionServerThreads(); + UTIL1.getMiniHBaseCluster().getRegionServerThreads(); for (RegionServerThread rs : rsThreads) { - utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); + UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } // Wait for all log roll to finish - utility1.waitFor(3000, new Waiter.ExplainingPredicate() { + UTIL1.waitFor(3000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { for (RegionServerThread rs : rsThreads) { @@ -134,18 +134,18 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; } }); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; @@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // now replicate some data. doPut(Bytes.toBytes("row42")); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; @@ -176,7 +176,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { int peerCount = admin.getPeersCount(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); // This test is flakey and then there is so much stuff flying around in here its, hard to // debug. Peer needs to be up for the edit to make it across. This wait on @@ -188,7 +188,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { // now replicate some data doPut(row); - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { // Looks like replication endpoint returns false unless we put more than 10 edits. We @@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testInterClusterReplication() throws Exception { final String id = "testInterClusterReplication"; - List regions = utility1.getHBaseCluster().getRegions(tableName); + List regions = UTIL1.getHBaseCluster().getRegions(tableName); int totEdits = 0; // Make sure edits are spread across regions because we do region based batching @@ -228,12 +228,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { } admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null); final int numEdits = totEdits; - Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; @@ -248,26 +248,27 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); admin.removePeer("testInterClusterReplication"); - utility1.deleteTableData(tableName); + UTIL1.deleteTableData(tableName); } @Test public void testWALEntryFilterFromReplicationEndpoint() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); - //test that we can create mutliple WALFilters reflectively + // test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, - EverythingPassesWALEntryFilter.class.getName() + - "," + EverythingPassesWALEntryFilterSubclass.class.getName()); + EverythingPassesWALEntryFilter.class.getName() + "," + + EverythingPassesWALEntryFilterSubclass.class.getName()); admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. - try (Connection connection = ConnectionFactory.createConnection(conf1)) { + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { doPut(connection, Bytes.toBytes("row1")); doPut(connection, row); doPut(connection, Bytes.toBytes("row2")); } - Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; @@ -280,37 +281,38 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); } - @Test (expected=IOException.class) + @Test(expected = IOException.class) public void testWALEntryFilterAddValidation() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); - //test that we can create mutliple WALFilters reflectively + // test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, - "IAmNotARealWalEntryFilter"); + "IAmNotARealWalEntryFilter"); admin.addPeer("testWALEntryFilterAddValidation", rpc); } - @Test (expected=IOException.class) + @Test(expected = IOException.class) public void testWALEntryFilterUpdateValidation() throws Exception { - ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + ReplicationPeerConfig rpc = + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); - //test that we can create mutliple WALFilters reflectively + // test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, - "IAmNotARealWalEntryFilter"); + "IAmNotARealWalEntryFilter"); admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); } - @Test - public void testMetricsSourceBaseSourcePassthrough(){ + public void testMetricsSourceBaseSourcePassthrough() { /* - The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl - and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. - Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which - allows for custom JMX metrics. - This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through - the two layers of wrapping to the actual BaseSource. - */ + * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a + * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of + * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows + * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on + * MetricsSource actually calls down through the two layers of wrapping to the actual + * BaseSource. + */ String id = "id"; DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); @@ -318,15 +320,16 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); - - MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); - MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); + MetricsReplicationSourceSource singleSourceSource = + new MetricsReplicationSourceSourceImpl(singleRms, id); + MetricsReplicationSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSource(globalRms); MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); Map singleSourceSourceByTable = new HashMap<>(); - MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource, - singleSourceSourceByTable); + MetricsSource source = + new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); String gaugeName = "gauge"; @@ -388,7 +391,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } private void doPut(byte[] row) throws IOException { - try (Connection connection = ConnectionFactory.createConnection(conf1)) { + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { doPut(connection, row); } } @@ -413,7 +416,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { - static UUID uuid = utility1.getRandomUUID(); + static UUID uuid = UTIL1.getRandomUUID(); static AtomicInteger contructedCount = new AtomicInteger(); static AtomicInteger startedCount = new AtomicInteger(); static AtomicInteger stoppedCount = new AtomicInteger(); @@ -562,7 +565,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } } - public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter { - + public static class EverythingPassesWALEntryFilterSubclass + extends EverythingPassesWALEntryFilter { } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java index 41cc9bc8d0c..ae99eb897ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java @@ -37,6 +37,6 @@ public class TestReplicationKillMasterRS extends TestReplicationKillRS { @Test public void killOneMasterRS() throws Exception { - loadTableAndKillRS(utility1); + loadTableAndKillRS(UTIL1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java index 6cbae83b9bc..e649149ca57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java @@ -41,7 +41,7 @@ public class TestReplicationKillMasterRSCompressed extends TestReplicationKillMa */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); TestReplicationBase.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java index 108f2744e73..aa3aadde51a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java @@ -36,12 +36,12 @@ public class TestReplicationKillMasterRSWithSeparateOldWALs extends TestReplicat @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); + CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); TestReplicationBase.setUpBeforeClass(); } @Test public void killOneMasterRS() throws Exception { - loadTableAndKillRS(utility1); + loadTableAndKillRS(UTIL1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java index 5b4fa2af1c2..c2457267b28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java @@ -57,10 +57,10 @@ public class TestReplicationKillRS extends TestReplicationBase { Thread killer = killARegionServer(util, 5000, rsToKill1); Result[] res; int initialCount; - try (Connection conn = ConnectionFactory.createConnection(conf1)) { + try (Connection conn = ConnectionFactory.createConnection(CONF1)) { try (Table table = conn.getTable(tableName)) { LOG.info("Start loading table"); - initialCount = utility1.loadTable(table, famName); + initialCount = UTIL1.loadTable(table, famName); LOG.info("Done loading table"); killer.join(5000); LOG.info("Done waiting for threads"); @@ -86,7 +86,7 @@ public class TestReplicationKillRS extends TestReplicationBase { int lastCount = 0; final long start = System.currentTimeMillis(); int i = 0; - try (Connection conn = ConnectionFactory.createConnection(conf2)) { + try (Connection conn = ConnectionFactory.createConnection(CONF2)) { try (Table table = conn.getTable(tableName)) { while (true) { if (i == NB_RETRIES - 1) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java index 96630b234fb..733fa3aa8d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java @@ -37,6 +37,6 @@ public class TestReplicationKillSlaveRS extends TestReplicationKillRS { @Test public void killOneSlaveRS() throws Exception { - loadTableAndKillRS(utility2); + loadTableAndKillRS(UTIL2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java index a852b81c24a..abff3e2caf6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java @@ -36,12 +36,12 @@ public class TestReplicationKillSlaveRSWithSeparateOldWALs extends TestReplicati @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); + CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true); TestReplicationBase.setUpBeforeClass(); } @Test public void killOneSlaveRS() throws Exception { - loadTableAndKillRS(utility2); + loadTableAndKillRS(UTIL2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java index 8ff4d84dcd7..c646a9011c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -42,7 +42,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { @Test public void testReplicationMetrics() throws Exception { - try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + try (Admin hbaseAdmin = UTIL1.getConnection().getAdmin()) { Put p = new Put(Bytes.toBytes("starter")); p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); htable1.put(p); @@ -52,7 +52,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { } // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp Thread.sleep(5000); - HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName); + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(tableName); Map metrics = rs.getWalGroupsReplicationStatus(); Assert.assertEquals("metric size ", 1, metrics.size()); long lastPosition = 0; @@ -72,7 +72,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { .size() == 0) { Thread.sleep(500); } - rs = utility1.getRSForFirstRegionInTable(tableName); + rs = UTIL1.getRSForFirstRegionInTable(tableName); metrics = rs.getWalGroupsReplicationStatus(); Path lastPath = null; for (Map.Entry metric : metrics.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 2c8dc4c0b7f..b8b96788f75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -102,7 +102,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { final byte[] v1 = Bytes.toBytes("v1"); final byte[] v2 = Bytes.toBytes("v2"); final byte[] v3 = Bytes.toBytes("v3"); - htable1 = utility1.getConnection().getTable(tableName); + htable1 = UTIL1.getConnection().getTable(tableName); long t = EnvironmentEdgeManager.currentTime(); // create three versions for "row" @@ -265,7 +265,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); hbaseAdmin.addReplicationPeer(PEER_ID, rpc); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); @@ -363,7 +363,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { final String colFam = "cf1"; final int numOfTables = 3; - Admin hadmin = utility1.getAdmin(); + Admin hadmin = UTIL1.getAdmin(); // Create Tables for (int i = 0; i < numOfTables; i++) { @@ -408,15 +408,15 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testReplicationInReplay() throws Exception { final TableName tableName = htable1.getName(); - HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0); + HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); RegionInfo hri = region.getRegionInfo(); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { scopes.put(fam, 1); } final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); - WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); + int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); + WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); final byte[] qualifier = Bytes.toBytes("q"); final byte[] value = Bytes.toBytes("v"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index a305b665f22..7eddc5c5c9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -54,7 +54,7 @@ public class TestReplicationStatus extends TestReplicationBase { */ @Test public void testReplicationStatus() throws Exception { - Admin hbaseAdmin = utility1.getAdmin(); + Admin hbaseAdmin = UTIL1.getAdmin(); // disable peer hbaseAdmin.disableReplicationPeer(PEER_ID2); @@ -69,7 +69,7 @@ public class TestReplicationStatus extends TestReplicationBase { ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); - for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster() + for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster() .getRegionServerThreads()) { ServerName server = thread.getRegionServer().getServerName(); ServerMetrics sm = metrics.getLiveServerMetrics().get(server); @@ -88,10 +88,10 @@ public class TestReplicationStatus extends TestReplicationBase { } // Stop rs1, then the queue of rs1 will be transfered to rs0 - utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + UTIL1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); Thread.sleep(10000); metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); - ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ServerMetrics sm = metrics.getLiveServerMetrics().get(server); List rLoadSourceList = sm.getReplicationLoadSourceList(); // check SourceList still only has one entry diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java index 0ec98e92aaa..edeaf9d2644 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java @@ -44,18 +44,18 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase { @Test public void testReplicationStatusAfterLagging() throws Exception { - utility2.shutdownMiniHBaseCluster(); - restartHBaseCluster(utility1, 1); + UTIL2.shutdownMiniHBaseCluster(); + restartHBaseCluster(UTIL1, 1); // add some values to cluster 1 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); htable1.put(p); } - utility2.startMiniHBaseCluster(); + UTIL2.startMiniHBaseCluster(); Thread.sleep(10000); - Admin hbaseAdmin = utility1.getAdmin(); - ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + Admin hbaseAdmin = UTIL1.getAdmin(); + ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); List loadSources = metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java index 75255bbe922..16d3822b321 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java @@ -44,7 +44,7 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli @Test public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception { - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); // add some values to cluster 1 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -52,9 +52,9 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli htable1.put(p); } Thread.sleep(10000); - restartHBaseCluster(utility1, 1); - Admin hbaseAdmin = utility1.getAdmin(); - ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + restartHBaseCluster(UTIL1, 1); + Admin hbaseAdmin = UTIL1.getAdmin(); + ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); Thread.sleep(10000); // add more values to cluster 1, these should cause normal queue to lag for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java index a8f266ad24b..6deb095acdb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java @@ -45,9 +45,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe @Test public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception { - utility2.shutdownMiniHBaseCluster(); - restartHBaseCluster(utility1, 1); - Admin hbaseAdmin = utility1.getAdmin(); + UTIL2.shutdownMiniHBaseCluster(); + restartHBaseCluster(UTIL1, 1); + Admin hbaseAdmin = UTIL1.getAdmin(); // add some values to source cluster for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -55,7 +55,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe htable1.put(p); } Thread.sleep(10000); - ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); List loadSources = metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java index 6625eb5278d..01f49f4fb0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java @@ -42,10 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe @Test public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception { - utility2.shutdownMiniHBaseCluster(); - restartHBaseCluster(utility1, 1); - Admin hbaseAdmin = utility1.getAdmin(); - ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + UTIL2.shutdownMiniHBaseCluster(); + restartHBaseCluster(UTIL1, 1); + Admin hbaseAdmin = UTIL1.getAdmin(); + ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); Thread.sleep(10000); ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); List loadSources = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java index 0b5f6e83838..fde87bc6b03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java @@ -46,7 +46,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery @Test public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception { - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); // add some values to cluster 1 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put p = new Put(Bytes.toBytes("row" + i)); @@ -54,9 +54,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery htable1.put(p); } Thread.sleep(10000); - restartHBaseCluster(utility1, 1); - Admin hbaseAdmin = utility1.getAdmin(); - ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + restartHBaseCluster(UTIL1, 1); + Admin hbaseAdmin = UTIL1.getAdmin(); + ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); Thread.sleep(10000); ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); List loadSources = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 745c4391681..c3bbca953f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -17,30 +17,20 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.ToolRunner; -import org.junit.After; -import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -48,183 +38,56 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, LargeTests.class }) -public class TestReplicationSyncUpTool extends TestReplicationBase { +public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); + HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); - private static final TableName t1_su = TableName.valueOf("t1_syncup"); - private static final TableName t2_su = TableName.valueOf("t2_syncup"); - - protected static final byte[] famName = Bytes.toBytes("cf1"); - private static final byte[] qualName = Bytes.toBytes("q1"); - - protected static final byte[] noRepfamName = Bytes.toBytes("norep"); - - private HTableDescriptor t1_syncupSource, t1_syncupTarget; - private HTableDescriptor t2_syncupSource, t2_syncupTarget; - - protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; - - @Before - public void setUp() throws Exception { - HColumnDescriptor fam; - - t1_syncupSource = new HTableDescriptor(t1_su); - fam = new HColumnDescriptor(famName); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - t1_syncupSource.addFamily(fam); - fam = new HColumnDescriptor(noRepfamName); - t1_syncupSource.addFamily(fam); - - t1_syncupTarget = new HTableDescriptor(t1_su); - fam = new HColumnDescriptor(famName); - t1_syncupTarget.addFamily(fam); - fam = new HColumnDescriptor(noRepfamName); - t1_syncupTarget.addFamily(fam); - - t2_syncupSource = new HTableDescriptor(t2_su); - fam = new HColumnDescriptor(famName); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - t2_syncupSource.addFamily(fam); - fam = new HColumnDescriptor(noRepfamName); - t2_syncupSource.addFamily(fam); - - t2_syncupTarget = new HTableDescriptor(t2_su); - fam = new HColumnDescriptor(famName); - t2_syncupTarget.addFamily(fam); - fam = new HColumnDescriptor(noRepfamName); - t2_syncupTarget.addFamily(fam); - } - - @After - public void tearDownBase() throws Exception { - // Do nothing, just replace the super tearDown. because the super tearDown will use the - // out-of-data HBase admin to remove replication peer, which will be result in failure. - } - /** - * Add a row to a table in each cluster, check it's replicated, delete it, - * check's gone Also check the puts and deletes are not replicated back to - * the originating cluster. + * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check + * the puts and deletes are not replicated back to the originating cluster. */ @Test public void testSyncUpTool() throws Exception { /** - * Set up Replication: on Master and one Slave - * Table: t1_syncup and t2_syncup - * columnfamily: - * 'cf1' : replicated - * 'norep': not replicated + * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: + * 'cf1' : replicated 'norep': not replicated */ setupReplication(); /** - * at Master: - * t1_syncup: put 100 rows into cf1, and 1 rows into norep - * t2_syncup: put 200 rows into cf1, and 1 rows into norep - * - * verify correctly replicated to slave + * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows + * into cf1, and 1 rows into norep verify correctly replicated to slave */ putAndReplicateRows(); /** - * Verify delete works - * - * step 1: stop hbase on Slave - * - * step 2: at Master: - * t1_syncup: delete 50 rows from cf1 - * t2_syncup: delete 100 rows from cf1 - * no change on 'norep' - * - * step 3: stop hbase on master, restart hbase on Slave - * - * step 4: verify Slave still have the rows before delete - * t1_syncup: 100 rows from cf1 - * t2_syncup: 200 rows from cf1 - * - * step 5: run syncup tool on Master - * - * step 6: verify that delete show up on Slave - * t1_syncup: 50 rows from cf1 - * t2_syncup: 100 rows from cf1 - * - * verify correctly replicated to Slave + * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows + * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on + * master, restart hbase on Slave step 4: verify Slave still have the rows before delete + * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master + * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows + * from cf1 verify correctly replicated to Slave */ mimicSyncUpAfterDelete(); /** - * Verify put works - * - * step 1: stop hbase on Slave - * - * step 2: at Master: - * t1_syncup: put 100 rows from cf1 - * t2_syncup: put 200 rows from cf1 - * and put another row on 'norep' - * ATTN: put to 'cf1' will overwrite existing rows, so end count will - * be 100 and 200 respectively - * put to 'norep' will add a new row. - * - * step 3: stop hbase on master, restart hbase on Slave - * - * step 4: verify Slave still has the rows before put - * t1_syncup: 50 rows from cf1 - * t2_syncup: 100 rows from cf1 - * - * step 5: run syncup tool on Master - * - * step 6: verify that put show up on Slave - * and 'norep' does not - * t1_syncup: 100 rows from cf1 - * t2_syncup: 200 rows from cf1 - * - * verify correctly replicated to Slave + * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from + * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will + * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will + * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave + * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step + * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not + * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to + * Slave */ mimicSyncUpAfterPut(); } - protected void setupReplication() throws Exception { - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); - - Admin ha = utility1.getAdmin(); - ha.createTable(t1_syncupSource); - ha.createTable(t2_syncupSource); - ha.close(); - - ha = utility2.getAdmin(); - ha.createTable(t1_syncupTarget); - ha.createTable(t2_syncupTarget); - ha.close(); - - Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration()); - Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration()); - - // Get HTable from Master - ht1Source = connection1.getTable(t1_su); - ht2Source = connection1.getTable(t2_su); - - // Get HTable from Peer1 - ht1TargetAtPeer1 = connection2.getTable(t1_su); - ht2TargetAtPeer1 = connection2.getTable(t2_su); - - /** - * set M-S : Master: utility1 Slave1: utility2 - */ - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); - admin1.addPeer("1", rpc, null); - - admin1.close(); - admin2.close(); - } - private void putAndReplicateRows() throws Exception { LOG.debug("putAndReplicateRows"); // add rows to Master cluster, @@ -233,46 +96,46 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { // 100 + 1 row to t1_syncup for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); ht1Source.put(p); } p = new Put(Bytes.toBytes("row" + 9999)); - p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); + p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); ht1Source.put(p); // 200 + 1 row to t2_syncup for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); ht2Source.put(p); } p = new Put(Bytes.toBytes("row" + 9999)); - p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); + p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); ht2Source.put(p); // ensure replication completed Thread.sleep(SLEEP_TIME); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCountHt1Source = countRows(ht1Source); for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - if (i==NB_RETRIES-1) { - assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1, - rowCount_ht1TargetAtPeer1); + int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + if (i == NB_RETRIES - 1) { + assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1, + rowCountHt1TargetAtPeer1); } - if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) { + if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) { break; } Thread.sleep(SLEEP_TIME); } - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCountHt2Source = countRows(ht2Source); for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); - if (i==NB_RETRIES-1) { - assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1, - rowCount_ht2TargetAtPeer1); + int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + if (i == NB_RETRIES - 1) { + assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1, + rowCountHt2TargetAtPeer1); } - if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) { + if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) { break; } Thread.sleep(SLEEP_TIME); @@ -281,7 +144,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { private void mimicSyncUpAfterDelete() throws Exception { LOG.debug("mimicSyncUpAfterDelete"); - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); List list = new ArrayList<>(); // delete half of the rows @@ -299,51 +162,50 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } ht2Source.delete(list); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = countRows(ht1Source); assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, rowCount_ht1Source); - int rowCount_ht2Source = utility1.countRows(ht2Source); - assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", - 101, rowCount_ht2Source); + int rowCount_ht2Source = countRows(ht2Source); + assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, + rowCount_ht2Source); - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL2.restartHBaseCluster(1); Thread.sleep(SLEEP_TIME); // before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); - assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); // After sync up for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + syncUp(UTIL1); + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); if (i == NB_RETRIES - 1) { - if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) { + if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); + UTIL1.restartHBaseCluster(1); + rowCount_ht1Source = countRows(ht1Source); LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2Source = countRows(ht2Source); LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); } assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, - rowCount_ht1TargetAtPeer1); + rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, - rowCount_ht2TargetAtPeer1); + rowCountHt2TargetAtPeer1); } - if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) { + if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) { LOG.info("SyncUpAfterDelete succeeded at retry = " + i); break; } else { - LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCount_ht2TargetAtPeer1); + LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1); } Thread.sleep(SLEEP_TIME); } @@ -351,82 +213,77 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { private void mimicSyncUpAfterPut() throws Exception { LOG.debug("mimicSyncUpAfterPut"); - utility1.restartHBaseCluster(1); - utility2.shutdownMiniHBaseCluster(); + UTIL1.restartHBaseCluster(1); + UTIL2.shutdownMiniHBaseCluster(); Put p; // another 100 + 1 row to t1_syncup // we should see 100 + 2 rows now for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); ht1Source.put(p); } p = new Put(Bytes.toBytes("row" + 9998)); - p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); + p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); ht1Source.put(p); // another 200 + 1 row to t1_syncup // we should see 200 + 2 rows now for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); ht2Source.put(p); } p = new Put(Bytes.toBytes("row" + 9998)); - p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); + p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); ht2Source.put(p); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = countRows(ht1Source); assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCount_ht2Source = countRows(ht2Source); assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL2.restartHBaseCluster(1); Thread.sleep(SLEEP_TIME); // before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, - rowCount_ht1TargetAtPeer1); + rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, - rowCount_ht2TargetAtPeer1); + rowCountHt2TargetAtPeer1); // after syun up for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + syncUp(UTIL1); + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); if (i == NB_RETRIES - 1) { - if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) { + if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); + UTIL1.restartHBaseCluster(1); + rowCount_ht1Source = countRows(ht1Source); LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2Source = countRows(ht2Source); LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); } assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, - rowCount_ht1TargetAtPeer1); + rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, - rowCount_ht2TargetAtPeer1); + rowCountHt2TargetAtPeer1); } - if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) { + if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) { LOG.info("SyncUpAfterPut succeeded at retry = " + i); break; } else { - LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCount_ht2TargetAtPeer1); + LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1); } Thread.sleep(SLEEP_TIME); } } - - protected void syncUp(HBaseTestingUtility ut) throws Exception { - ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java new file mode 100644 index 00000000000..bf3941d72ba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +public abstract class TestReplicationSyncUpToolBase { + + protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + protected static final TableName TN1 = TableName.valueOf("t1_syncup"); + protected static final TableName TN2 = TableName.valueOf("t2_syncup"); + + protected static final byte[] FAMILY = Bytes.toBytes("cf1"); + protected static final byte[] QUALIFIER = Bytes.toBytes("q1"); + + protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep"); + + protected TableDescriptor t1SyncupSource; + protected TableDescriptor t1SyncupTarget; + protected TableDescriptor t2SyncupSource; + protected TableDescriptor t2SyncupTarget; + + protected Connection conn1; + protected Connection conn2; + + protected Table ht1Source; + protected Table ht2Source; + protected Table ht1TargetAtPeer1; + protected Table ht2TargetAtPeer1; + + protected void customizeClusterConf(Configuration conf) { + } + + @Before + public void setUp() throws Exception { + customizeClusterConf(UTIL1.getConfiguration()); + customizeClusterConf(UTIL2.getConfiguration()); + TestReplicationBase.configureClusters(UTIL1, UTIL2); + UTIL1.startMiniZKCluster(); + UTIL2.setZkCluster(UTIL1.getZkCluster()); + + UTIL1.startMiniCluster(2); + UTIL2.startMiniCluster(4); + + t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); + + t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); + + t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); + + t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); + } + + @After + public void tearDown() throws Exception { + Closeables.close(ht1Source, true); + Closeables.close(ht2Source, true); + Closeables.close(ht1TargetAtPeer1, true); + Closeables.close(ht2TargetAtPeer1, true); + Closeables.close(conn1, true); + Closeables.close(conn2, true); + UTIL2.shutdownMiniCluster(); + UTIL1.shutdownMiniCluster(); + } + + protected final void setupReplication() throws Exception { + Admin admin1 = UTIL1.getAdmin(); + admin1.createTable(t1SyncupSource); + admin1.createTable(t2SyncupSource); + + Admin admin2 = UTIL2.getAdmin(); + admin2.createTable(t1SyncupTarget); + admin2.createTable(t2SyncupTarget); + + // Get HTable from Master + Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); + ht1Source = conn1.getTable(TN1); + ht2Source = conn1.getTable(TN2); + + // Get HTable from Peer1 + Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); + ht1TargetAtPeer1 = conn2.getTable(TN1); + ht2TargetAtPeer1 = conn2.getTable(TN2); + + /** + * set M-S : Master: utility1 Slave1: utility2 + */ + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); + admin1.addReplicationPeer("1", rpc); + } + + protected final void syncUp(HBaseTestingUtility util) throws Exception { + ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]); + } +} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java similarity index 62% rename from hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java index eb575c5eb6b..5f10ef96285 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; +import static org.apache.hadoop.hbase.replication.TestReplicationBase.row; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -26,6 +30,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -35,35 +40,34 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; -import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, LargeTests.class }) -public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { +public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); + HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); - private static final Logger LOG = LoggerFactory - .getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); - conf1.set("hbase.replication.source.fs.conf.provider", - TestSourceFSConfigurationProvider.class.getCanonicalName()); - TestReplicationBase.setUpBeforeClass(); - } + private static final Logger LOG = + LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); @Override + protected void customizeClusterConf(Configuration conf) { + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf.set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + } + + @Test public void testSyncUpTool() throws Exception { /** * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: @@ -77,7 +81,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication Iterator randomHFileRangeListIterator = null; Set randomHFileRanges = new HashSet<>(16); for (int i = 0; i < 16; i++) { - randomHFileRanges.add(utility1.getRandomUUID().toString()); + randomHFileRanges.add(UTIL1.getRandomUUID().toString()); } List randomHFileRangeList = new ArrayList<>(randomHFileRanges); Collections.sort(randomHFileRangeList); @@ -105,58 +109,58 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication private void mimicSyncUpAfterBulkLoad(Iterator randomHFileRangeListIterator) throws Exception { LOG.debug("mimicSyncUpAfterBulkLoad"); - utility2.shutdownMiniHBaseCluster(); + UTIL2.shutdownMiniHBaseCluster(); loadAndReplicateHFiles(false, randomHFileRangeListIterator); - int rowCount_ht1Source = utility1.countRows(ht1Source); + int rowCount_ht1Source = countRows(ht1Source); assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, rowCount_ht1Source); - int rowCount_ht2Source = utility1.countRows(ht2Source); + int rowCount_ht2Source = countRows(ht2Source); assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, rowCount_ht2Source); - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); + UTIL1.shutdownMiniHBaseCluster(); + UTIL2.restartHBaseCluster(1); Thread.sleep(SLEEP_TIME); // Before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); - assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); // Run sync up tool - syncUp(utility1); + syncUp(UTIL1); // After syun up for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + syncUp(UTIL1); + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); if (i == NB_RETRIES - 1) { - if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { + if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) { // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); + UTIL1.restartHBaseCluster(1); + rowCount_ht1Source = countRows(ht1Source); LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2Source = countRows(ht2Source); LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); } assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, - rowCount_ht1TargetAtPeer1); + rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, - rowCount_ht2TargetAtPeer1); + rowCountHt2TargetAtPeer1); } - if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { + if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) { LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); break; } else { - LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCount_ht2TargetAtPeer1); + LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + + ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 + + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1); } Thread.sleep(SLEEP_TIME); } @@ -168,44 +172,42 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication // Load 100 + 3 hfiles to t1_syncup. byte[][][] hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, - 100); + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100); hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source, hfileRanges, 3); // Load 200 + 3 hfiles to t2_syncup. hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, - 200); + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200); hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source, hfileRanges, 3); if (verifyReplicationOnSlave) { // ensure replication completed - wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, + wait(ht1TargetAtPeer1, countRows(ht1Source) - 3, "t1_syncup has 103 rows on source, and 100 on slave1"); - wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, + wait(ht2TargetAtPeer1, countRows(ht2Source) - 3, "t2_syncup has 203 rows on source, and 200 on slave1"); } } private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { - Path dir = utility1.getDataTestDirOnTestFS(testName); - FileSystem fs = utility1.getTestFileSystem(); + Path dir = UTIL1.getDataTestDirOnTestFS(testName); + FileSystem fs = UTIL1.getTestFileSystem(); dir = dir.makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(fam)); @@ -213,24 +215,23 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; - HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), fam, row, from, to, numOfRows); + HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs, + new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); } final TableName tableName = source.getName(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); - String[] args = { dir.toString(), tableName.toString() }; - loader.run(args); + BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); + loader.bulkLoad(tableName, dir); } - private void wait(Table target, int expectedCount, String msg) throws IOException, - InterruptedException { + private void wait(Table target, int expectedCount, String msg) + throws IOException, InterruptedException { for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); + int rowCountHt2TargetAtPeer1 = countRows(target); if (i == NB_RETRIES - 1) { - assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); + assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1); } - if (expectedCount == rowCount_ht2TargetAtPeer1) { + if (expectedCount == rowCountHt2TargetAtPeer1) { break; } Thread.sleep(SLEEP_TIME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java index 594aac0b5c9..2aa3ea4b0e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java @@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplication @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); TestReplicationEndpoint.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java index 68b41be457b..36c07fd014d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java @@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationEndpoint.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java index 4685f24c0de..0f794928397 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java @@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); TestReplicationKillMasterRSCompressed.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java index 82fef3aa58b..21f325c1b16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java @@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); + CONF1.set(WALFactory.WAL_PROVIDER, "multiwal"); + CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); TestReplicationKillMasterRSCompressed.setUpBeforeClass(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java index 14514993470..b2835eee876 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hbase.replication.multiwal; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -35,10 +34,9 @@ public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicati public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleAsyncWAL.class); - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); - TestReplicationBase.setUpBeforeClass(); + @Override + protected void customizeClusterConf(Configuration conf) { + conf.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java index e487039dcd9..a5dbaf3f1c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hbase.replication.multiwal; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -33,12 +32,11 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class); + HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class); - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); - conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); - TestReplicationBase.setUpBeforeClass(); + @Override + protected void customizeClusterConf(Configuration conf) { + conf.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index 24329a0fff8..bff363f986e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -59,19 +59,19 @@ public class TestReplicator extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { // Set RPC size limit to 10kb (will be applied to both source and sink clusters) - conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); + CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); TestReplicationBase.setUpBeforeClass(); } @Test public void testReplicatorBatching() throws Exception { // Clear the tables - truncateTable(utility1, tableName); - truncateTable(utility2, tableName); + truncateTable(UTIL1, tableName); + truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test admin.addPeer("testReplicatorBatching", - new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); @@ -92,7 +92,7 @@ public class TestReplicator extends TestReplicationBase { } // Wait for replication to complete. - Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); @@ -107,7 +107,7 @@ public class TestReplicator extends TestReplicationBase { assertEquals("We sent an incorrect number of batches", NUM_ROWS, ReplicationEndpointForTest.getBatchCount()); - assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { admin.removePeer("testReplicatorBatching"); } @@ -116,12 +116,12 @@ public class TestReplicator extends TestReplicationBase { @Test public void testReplicatorWithErrors() throws Exception { // Clear the tables - truncateTable(utility1, tableName); - truncateTable(utility2, tableName); + truncateTable(UTIL1, tableName); + truncateTable(UTIL2, tableName); // Replace the peer set up for us by the base class with a wrapper for this test admin.addPeer("testReplicatorWithErrors", - new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), null); @@ -143,7 +143,7 @@ public class TestReplicator extends TestReplicationBase { // Wait for replication to complete. // We can expect 10 batches - Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; @@ -155,7 +155,7 @@ public class TestReplicator extends TestReplicationBase { } }); - assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); } finally { admin.removePeer("testReplicatorWithErrors"); }