HBASE-22524 Refactor TestReplicationSyncUpTool

This commit is contained in:
zhangduo 2019-06-03 20:47:32 +08:00 committed by Apache9
parent 7d9f79b93d
commit 26037854ad
33 changed files with 589 additions and 612 deletions

View File

@ -27,8 +27,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -92,7 +90,7 @@ public class TestVerifyReplication extends TestReplicationBase {
@Before
public void setUp() throws Exception {
cleanUp();
utility2.deleteTableData(peerTableName);
UTIL2.deleteTableData(peerTableName);
}
@BeforeClass
@ -103,7 +101,7 @@ public class TestVerifyReplication extends TestReplicationBase {
ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100)
.build()).build();
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
@ -112,7 +110,7 @@ public class TestVerifyReplication extends TestReplicationBase {
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
if (job == null) {
fail("Job wasn't created, see the log");
}
@ -174,24 +172,20 @@ public class TestVerifyReplication extends TestReplicationBase {
.setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
TableDescriptor table =
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
scopes.put(f.getName(), f.getScope());
}
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
utility1.waitUntilAllRegionsAssigned(tableName);
utility2.waitUntilAllRegionsAssigned(tableName);
UTIL1.waitUntilAllRegionsAssigned(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
lHtable1 = utility1.getConnection().getTable(tableName);
lHtable2 = utility2.getConnection().getTable(tableName);
lHtable1 = UTIL1.getConnection().getTable(tableName);
lHtable2 = UTIL2.getConnection().getTable(tableName);
Put put = new Put(row);
put.addColumn(familyname, row, row);
@ -442,30 +436,30 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// Take source and target tables snapshot
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
Path rootDir = FSUtils.getRootDir(CONF1);
FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
Path peerRootDir = FSUtils.getRootDir(CONF2);
FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String temPath1 = utility1.getRandomDir().toString();
String temPath1 = UTIL1.getRandomDir().toString();
String temPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, temPath1, 1);
checkRestoreTmpDir(conf2, temPath2, 1);
checkRestoreTmpDir(CONF1, temPath1, 1);
checkRestoreTmpDir(CONF2, temPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
@ -481,20 +475,20 @@ public class TestVerifyReplication extends TestReplicationBase {
htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, temPath1, 2);
checkRestoreTmpDir(conf2, temPath2, 2);
checkRestoreTmpDir(CONF1, temPath1, 2);
checkRestoreTmpDir(CONF2, temPath2, 2);
}
@Test
@ -504,7 +498,7 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// with a quorum address (a cluster key)
String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
Scan scan = new Scan();
@ -529,31 +523,31 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// Take source and target tables snapshot
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
Path rootDir = FSUtils.getRootDir(CONF1);
FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
Path peerRootDir = FSUtils.getRootDir(CONF2);
FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String tmpPath1 = utility1.getRandomDir().toString();
String tmpPath1 = UTIL1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, tmpPath1, 1);
checkRestoreTmpDir(conf2, tmpPath2, 1);
checkRestoreTmpDir(CONF1, tmpPath1, 1);
checkRestoreTmpDir(CONF2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
@ -569,21 +563,21 @@ public class TestVerifyReplication extends TestReplicationBase {
htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, tmpPath1, 2);
checkRestoreTmpDir(conf2, tmpPath2, 2);
checkRestoreTmpDir(CONF1, tmpPath1, 2);
checkRestoreTmpDir(CONF2, tmpPath2, 2);
}
private static void runBatchCopyTest() throws Exception {
@ -621,10 +615,10 @@ public class TestVerifyReplication extends TestReplicationBase {
// with a peerTableName along with quorum address (a cluster key)
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
utility2.getClusterKey(), tableName.getNameAsString() };
UTIL2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
utility2.deleteTableData(peerTableName);
UTIL2.deleteTableData(peerTableName);
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
@ -634,32 +628,32 @@ public class TestVerifyReplication extends TestReplicationBase {
runBatchCopyTest();
// Take source and target tables snapshot
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
Path rootDir = FSUtils.getRootDir(CONF1);
FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
Path peerRootDir = FSUtils.getRootDir(CONF2);
FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String tmpPath1 = utility1.getRandomDir().toString();
String tmpPath1 = UTIL1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, tmpPath1, 1);
checkRestoreTmpDir(conf2, tmpPath2, 1);
checkRestoreTmpDir(CONF1, tmpPath1, 1);
checkRestoreTmpDir(CONF2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable3.getScanner(scan);
@ -675,22 +669,22 @@ public class TestVerifyReplication extends TestReplicationBase {
htable3.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
"--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, tmpPath1, 2);
checkRestoreTmpDir(conf2, tmpPath2, 2);
checkRestoreTmpDir(CONF1, tmpPath1, 2);
checkRestoreTmpDir(CONF2, tmpPath2, 2);
}
@AfterClass

View File

@ -74,11 +74,11 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
connection1 = ConnectionFactory.createConnection(conf1);
connection2 = ConnectionFactory.createConnection(conf2);
connection1 = ConnectionFactory.createConnection(CONF1);
connection2 = ConnectionFactory.createConnection(CONF2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();
adminExt = new ReplicationAdmin(conf1);
adminExt = new ReplicationAdmin(CONF1);
}
@AfterClass
@ -199,8 +199,8 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
}
} finally {
utility1.deleteTable(tn);
utility2.deleteTable(tn);
UTIL1.deleteTable(tn);
UTIL2.deleteTable(tn);
}
}
@ -273,7 +273,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
public void testReplicationPeerConfigUpdateCallback() throws Exception {
String peerId = "1";
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
rpc.setClusterKey(UTIL2.getClusterKey());
rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
rpc.getConfiguration().put("key1", "value1");
@ -325,7 +325,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@Override
public UUID getPeerUUID() {
return utility1.getRandomUUID();
return UTIL1.getRandomUUID();
}
@Override

View File

@ -101,8 +101,8 @@ public class TestNamespaceReplication extends TestReplicationBase {
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
connection1 = ConnectionFactory.createConnection(conf1);
connection2 = ConnectionFactory.createConnection(conf2);
connection1 = ConnectionFactory.createConnection(CONF1);
connection2 = ConnectionFactory.createConnection(CONF2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();

View File

@ -24,18 +24,14 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -52,7 +48,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -72,22 +67,19 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
public class TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
protected static Configuration conf1 = HBaseConfiguration.create();
protected static Configuration conf2;
protected static Configuration CONF_WITH_LOCALFS;
protected static ZKWatcher zkw1;
protected static ZKWatcher zkw2;
protected static ReplicationAdmin admin;
protected static Admin hbaseAdmin;
protected static Table htable1;
protected static Table htable2;
protected static NavigableMap<byte[], Integer> scopes;
protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
protected static final Configuration CONF1 = UTIL1.getConfiguration();
protected static final Configuration CONF2 = UTIL2.getConfiguration();
protected static final int NUM_SLAVES1 = 2;
protected static final int NUM_SLAVES2 = 4;
protected static final int NB_ROWS_IN_BATCH = 100;
@ -113,12 +105,12 @@ public class TestReplicationBase {
protected final void cleanUp() throws IOException, InterruptedException {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
int rowCount = utility1.countRows(tableName);
utility1.deleteTableData(tableName);
int rowCount = UTIL1.countRows(tableName);
UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@ -180,92 +172,85 @@ public class TestReplicationBase {
htable1.put(puts);
}
protected static void configureClusters(){
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
private static void setupConfig(HBaseTestingUtility util, String znodeParent) {
Configuration conf = util.getConfiguration();
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
// sufficient number of events. But we don't want to go too low because
// HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
// more than one batch sent to the peer cluster for better testing.
conf1.setInt("replication.source.size.capacity", 102400);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setInt("zookeeper.recovery.retry", 1);
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5);
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
conf1.setBoolean("replication.source.eof.autorecovery", true);
conf1.setLong("hbase.serial.replication.waiting.ms", 100);
conf.setInt("replication.source.size.capacity", 102400);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
conf.setFloat("replication.source.ratio", 1.0f);
conf.setBoolean("replication.source.eof.autorecovery", true);
conf.setLong("hbase.serial.replication.waiting.ms", 100);
}
utility1 = new HBaseTestingUtility(conf1);
static void configureClusters(HBaseTestingUtility util1,
HBaseTestingUtility util2) {
setupConfig(util1, "/1");
setupConfig(util2, "/2");
// Base conf2 on conf1 so it gets the right zk cluster.
conf2 = HBaseConfiguration.create(conf1);
Configuration conf2 = util2.getConfiguration();
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility2 = new HBaseTestingUtility(conf2);
}
protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves)
throws Exception {
util.shutdownMiniHBaseCluster();
util
.startMiniHBaseCluster(StartMiniClusterOption.builder().numRegionServers(numSlaves).build());
util.restartHBaseCluster(numSlaves);
}
protected static void startClusters() throws Exception{
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
// Have to reget conf1 in case zk cluster location different
// than default
conf1 = utility1.getConfiguration();
zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
admin = new ReplicationAdmin(conf1);
protected static void startClusters() throws Exception {
UTIL1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
admin = new ReplicationAdmin(CONF1);
LOG.info("Setup first Zk");
utility2.setZkCluster(miniZK);
zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
UTIL2.setZkCluster(miniZK);
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
utility1.startMiniCluster(NUM_SLAVES1);
CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
UTIL1.startMiniCluster(NUM_SLAVES1);
// Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
utility2.startMiniCluster(NUM_SLAVES2);
UTIL2.startMiniCluster(NUM_SLAVES2);
hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
scopes.put(f.getName(), f.getScope());
}
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
utility1.waitUntilAllRegionsAssigned(tableName);
utility2.waitUntilAllRegionsAssigned(tableName);
UTIL1.waitUntilAllRegionsAssigned(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
htable2 = connection2.getTable(tableName);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
configureClusters();
configureClusters(UTIL1, UTIL2);
startClusters();
}
@ -277,9 +262,9 @@ public class TestReplicationBase {
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer());
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer());
if (isSyncPeer()) {
FileSystem fs2 = utility2.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to
// confirm that a sync peer in DA state can still replicate data to remote cluster
// asynchronously.
@ -303,7 +288,7 @@ public class TestReplicationBase {
Put put = new Put(row);
put.addColumn(famName, row, row);
htable1 = utility1.getConnection().getTable(tableName);
htable1 = UTIL1.getConnection().getTable(tableName);
htable1.put(put);
Get get = new Get(row);
@ -358,7 +343,7 @@ public class TestReplicationBase {
htable2.close();
htable1.close();
admin.close();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}
}

View File

@ -88,11 +88,11 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
utility1.deleteTableData(tableName);
UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@ -123,7 +123,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
@Test
public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster();
int numRS = peerCluster.getRegionServerThreads().size();
doPutTest(Bytes.toBytes(1));
@ -150,7 +150,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
put.addColumn(famName, row, row);
if (htable1 == null) {
htable1 = utility1.getConnection().getTable(tableName);
htable1 = UTIL1.getConnection().getTable(tableName);
}
htable1.put(put);

View File

@ -54,7 +54,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
*/
@Test
public void testDisableInactivePeer() throws Exception {
utility2.shutdownMiniHBaseCluster();
UTIL2.shutdownMiniHBaseCluster();
byte[] rowkey = Bytes.toBytes("disable inactive peer");
Put put = new Put(rowkey);
@ -67,7 +67,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
// disable and start the peer
admin.disablePeer("2");
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
utility2.startMiniHBaseCluster(option);
UTIL2.startMiniHBaseCluster(option);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);

View File

@ -63,14 +63,14 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
public void setUpBase() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
// Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
super.setUpBase();
int rowCount = utility1.countRows(tableName);
utility1.deleteTableData(tableName);
int rowCount = UTIL1.countRows(tableName);
UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@ -101,7 +101,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
// may apply first, and then test_dropped table, and we will believe that the replication is not
// got stuck (HBASE-20475).
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
}
@Test
@ -121,11 +121,11 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
@Test
public void testEditsDroppedWithDroppedTableNS() throws Exception {
// also try with a namespace
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection1 = ConnectionFactory.createConnection(CONF1);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createNamespace(NamespaceDescriptor.create("NS").build());
}
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createNamespace(NamespaceDescriptor.create("NS").build());
}
@ -143,13 +143,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
}
private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
// make sure we have a single region server only, so that all
// edits for all tables go there
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
UTIL1.shutdownMiniHBaseCluster();
UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf(tName);
byte[] familyName = Bytes.toBytes("fam");
@ -161,16 +161,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
utility1.waitUntilAllRegionsAssigned(tablename);
utility2.waitUntilAllRegionsAssigned(tablename);
UTIL1.waitUntilAllRegionsAssigned(tablename);
UTIL2.waitUntilAllRegionsAssigned(tablename);
// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
@ -213,18 +213,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
verifyReplicationStuck();
}
// just to be safe
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}
@Test
public void testEditsBehindDroppedTableTiming() throws Exception {
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
// make sure we have a single region server only, so that all
// edits for all tables go there
utility1.shutdownMiniHBaseCluster();
utility1.startMiniHBaseCluster();
UTIL1.shutdownMiniHBaseCluster();
UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf("testdroppedtimed");
byte[] familyName = Bytes.toBytes("fam");
@ -236,16 +236,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection1 = ConnectionFactory.createConnection(CONF1);
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
utility1.waitUntilAllRegionsAssigned(tablename);
utility2.waitUntilAllRegionsAssigned(tablename);
UTIL1.waitUntilAllRegionsAssigned(tablename);
UTIL2.waitUntilAllRegionsAssigned(tablename);
// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
@ -290,7 +290,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
verifyReplicationProceeded();
}
// just to be safe
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}
private boolean peerHasAllNormalRows() throws IOException {

View File

@ -55,16 +55,16 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
* @param numRs number of regionservers
*/
private void waitForLogAdvance(int numRs) throws Exception {
Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) utility1.getHBaseCluster()
Replication replicationService = (Replication) UTIL1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
@ -81,19 +81,19 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
@Test
public void testEmptyWALRecovery() throws Exception {
final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
utility1.getTestFileSystem().create(emptyWalPath).close();
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
@ -102,12 +102,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}

View File

@ -83,7 +83,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
}
@AfterClass
@ -101,12 +101,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
ReplicationEndpointReturningFalse.replicated.set(false);
ReplicationEndpointForTest.lastEntries = null;
final List<RegionServerThread> rsThreads =
utility1.getMiniHBaseCluster().getRegionServerThreads();
UTIL1.getMiniHBaseCluster().getRegionServerThreads();
for (RegionServerThread rs : rsThreads) {
utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
}
// Wait for all log roll to finish
utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (RegionServerThread rs : rsThreads) {
@ -134,18 +134,18 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
}
});
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// now replicate some data.
doPut(Bytes.toBytes("row42"));
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
@ -176,7 +176,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
int peerCount = admin.getPeersCount();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
@ -188,7 +188,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// now replicate some data
doPut(row);
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// Looks like replication endpoint returns false unless we put more than 10 edits. We
@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testInterClusterReplication() throws Exception {
final String id = "testInterClusterReplication";
List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
int totEdits = 0;
// Make sure edits are spread across regions because we do region based batching
@ -228,12 +228,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
final int numEdits = totEdits;
Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
@ -248,26 +248,27 @@ public class TestReplicationEndpoint extends TestReplicationBase {
});
admin.removePeer("testInterClusterReplication");
utility1.deleteTableData(tableName);
UTIL1.deleteTableData(tableName);
}
@Test
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
//test that we can create mutliple WALFilters reflectively
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
EverythingPassesWALEntryFilter.class.getName() +
"," + EverythingPassesWALEntryFilterSubclass.class.getName());
EverythingPassesWALEntryFilter.class.getName() + "," +
EverythingPassesWALEntryFilterSubclass.class.getName());
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
// now replicate some data.
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, Bytes.toBytes("row1"));
doPut(connection, row);
doPut(connection, Bytes.toBytes("row2"));
}
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
@ -280,37 +281,38 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
}
@Test (expected=IOException.class)
@Test(expected = IOException.class)
public void testWALEntryFilterAddValidation() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
//test that we can create mutliple WALFilters reflectively
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
"IAmNotARealWalEntryFilter");
"IAmNotARealWalEntryFilter");
admin.addPeer("testWALEntryFilterAddValidation", rpc);
}
@Test (expected=IOException.class)
@Test(expected = IOException.class)
public void testWALEntryFilterUpdateValidation() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
//test that we can create mutliple WALFilters reflectively
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
"IAmNotARealWalEntryFilter");
"IAmNotARealWalEntryFilter");
admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
}
@Test
public void testMetricsSourceBaseSourcePassthrough(){
public void testMetricsSourceBaseSourcePassthrough() {
/*
The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
allows for custom JMX metrics.
This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
the two layers of wrapping to the actual BaseSource.
*/
* The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
* MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
* those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
* for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
* MetricsSource actually calls down through the two layers of wrapping to the actual
* BaseSource.
*/
String id = "id";
DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
@ -318,15 +320,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource =
new MetricsReplicationGlobalSourceSource(globalRms);
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
singleSourceSourceByTable);
MetricsSource source =
new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
String gaugeName = "gauge";
@ -388,7 +391,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, row);
}
}
@ -413,7 +416,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
static UUID uuid = utility1.getRandomUUID();
static UUID uuid = UTIL1.getRandomUUID();
static AtomicInteger contructedCount = new AtomicInteger();
static AtomicInteger startedCount = new AtomicInteger();
static AtomicInteger stoppedCount = new AtomicInteger();
@ -562,7 +565,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
public static class EverythingPassesWALEntryFilterSubclass
extends EverythingPassesWALEntryFilter {
}
}

View File

@ -37,6 +37,6 @@ public class TestReplicationKillMasterRS extends TestReplicationKillRS {
@Test
public void killOneMasterRS() throws Exception {
loadTableAndKillRS(utility1);
loadTableAndKillRS(UTIL1);
}
}

View File

@ -41,7 +41,7 @@ public class TestReplicationKillMasterRSCompressed extends TestReplicationKillMa
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}
}

View File

@ -36,12 +36,12 @@ public class TestReplicationKillMasterRSWithSeparateOldWALs extends TestReplicat
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneMasterRS() throws Exception {
loadTableAndKillRS(utility1);
loadTableAndKillRS(UTIL1);
}
}

View File

@ -57,10 +57,10 @@ public class TestReplicationKillRS extends TestReplicationBase {
Thread killer = killARegionServer(util, 5000, rsToKill1);
Result[] res;
int initialCount;
try (Connection conn = ConnectionFactory.createConnection(conf1)) {
try (Connection conn = ConnectionFactory.createConnection(CONF1)) {
try (Table table = conn.getTable(tableName)) {
LOG.info("Start loading table");
initialCount = utility1.loadTable(table, famName);
initialCount = UTIL1.loadTable(table, famName);
LOG.info("Done loading table");
killer.join(5000);
LOG.info("Done waiting for threads");
@ -86,7 +86,7 @@ public class TestReplicationKillRS extends TestReplicationBase {
int lastCount = 0;
final long start = System.currentTimeMillis();
int i = 0;
try (Connection conn = ConnectionFactory.createConnection(conf2)) {
try (Connection conn = ConnectionFactory.createConnection(CONF2)) {
try (Table table = conn.getTable(tableName)) {
while (true) {
if (i == NB_RETRIES - 1) {

View File

@ -37,6 +37,6 @@ public class TestReplicationKillSlaveRS extends TestReplicationKillRS {
@Test
public void killOneSlaveRS() throws Exception {
loadTableAndKillRS(utility2);
loadTableAndKillRS(UTIL2);
}
}

View File

@ -36,12 +36,12 @@ public class TestReplicationKillSlaveRSWithSeparateOldWALs extends TestReplicati
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneSlaveRS() throws Exception {
loadTableAndKillRS(utility2);
loadTableAndKillRS(UTIL2);
}
}

View File

@ -42,7 +42,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
@Test
public void testReplicationMetrics() throws Exception {
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
try (Admin hbaseAdmin = UTIL1.getConnection().getAdmin()) {
Put p = new Put(Bytes.toBytes("starter"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
@ -52,7 +52,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(tableName);
Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
Assert.assertEquals("metric size ", 1, metrics.size());
long lastPosition = 0;
@ -72,7 +72,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
.size() == 0) {
Thread.sleep(500);
}
rs = utility1.getRSForFirstRegionInTable(tableName);
rs = UTIL1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
Path lastPath = null;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {

View File

@ -102,7 +102,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
final byte[] v1 = Bytes.toBytes("v1");
final byte[] v2 = Bytes.toBytes("v2");
final byte[] v3 = Bytes.toBytes("v3");
htable1 = utility1.getConnection().getTable(tableName);
htable1 = UTIL1.getConnection().getTable(tableName);
long t = EnvironmentEdgeManager.currentTime();
// create three versions for "row"
@ -265,7 +265,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep");
@ -363,7 +363,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
final String colFam = "cf1";
final int numOfTables = 3;
Admin hadmin = utility1.getAdmin();
Admin hadmin = UTIL1.getAdmin();
// Create Tables
for (int i = 0; i < numOfTables; i++) {
@ -408,15 +408,15 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testReplicationInReplay() throws Exception {
final TableName tableName = htable1.getName();
HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0);
RegionInfo hri = region.getRegionInfo();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
scopes.put(fam, 1);
}
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
final byte[] qualifier = Bytes.toBytes("q");
final byte[] value = Bytes.toBytes("v");

View File

@ -54,7 +54,7 @@ public class TestReplicationStatus extends TestReplicationBase {
*/
@Test
public void testReplicationStatus() throws Exception {
Admin hbaseAdmin = utility1.getAdmin();
Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer
hbaseAdmin.disableReplicationPeer(PEER_ID2);
@ -69,7 +69,7 @@ public class TestReplicationStatus extends TestReplicationBase {
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
ServerName server = thread.getRegionServer().getServerName();
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
@ -88,10 +88,10 @@ public class TestReplicationStatus extends TestReplicationBase {
}
// Stop rs1, then the queue of rs1 will be transfered to rs0
utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
UTIL1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
Thread.sleep(10000);
metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
// check SourceList still only has one entry

View File

@ -44,18 +44,18 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase {
@Test
public void testReplicationStatusAfterLagging() throws Exception {
utility2.shutdownMiniHBaseCluster();
restartHBaseCluster(utility1, 1);
UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1);
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
utility2.startMiniHBaseCluster();
UTIL2.startMiniHBaseCluster();
Thread.sleep(10000);
Admin hbaseAdmin = utility1.getAdmin();
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();

View File

@ -44,7 +44,7 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
utility2.shutdownMiniHBaseCluster();
UTIL2.shutdownMiniHBaseCluster();
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@ -52,9 +52,9 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
htable1.put(p);
}
Thread.sleep(10000);
restartHBaseCluster(utility1, 1);
Admin hbaseAdmin = utility1.getAdmin();
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
restartHBaseCluster(UTIL1, 1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
// add more values to cluster 1, these should cause normal queue to lag
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {

View File

@ -45,9 +45,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
utility2.shutdownMiniHBaseCluster();
restartHBaseCluster(utility1, 1);
Admin hbaseAdmin = utility1.getAdmin();
UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1);
Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@ -55,7 +55,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
htable1.put(p);
}
Thread.sleep(10000);
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();

View File

@ -42,10 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe
@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
utility2.shutdownMiniHBaseCluster();
restartHBaseCluster(utility1, 1);
Admin hbaseAdmin = utility1.getAdmin();
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =

View File

@ -46,7 +46,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
@Test
public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
utility2.shutdownMiniHBaseCluster();
UTIL2.shutdownMiniHBaseCluster();
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@ -54,9 +54,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
htable1.put(p);
}
Thread.sleep(10000);
restartHBaseCluster(utility1, 1);
Admin hbaseAdmin = utility1.getAdmin();
ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
restartHBaseCluster(UTIL1, 1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =

View File

@ -17,30 +17,20 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -48,183 +38,56 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpTool extends TestReplicationBase {
public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
private static final TableName t1_su = TableName.valueOf("t1_syncup");
private static final TableName t2_su = TableName.valueOf("t2_syncup");
protected static final byte[] famName = Bytes.toBytes("cf1");
private static final byte[] qualName = Bytes.toBytes("q1");
protected static final byte[] noRepfamName = Bytes.toBytes("norep");
private HTableDescriptor t1_syncupSource, t1_syncupTarget;
private HTableDescriptor t2_syncupSource, t2_syncupTarget;
protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
@Before
public void setUp() throws Exception {
HColumnDescriptor fam;
t1_syncupSource = new HTableDescriptor(t1_su);
fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
t1_syncupSource.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
t1_syncupSource.addFamily(fam);
t1_syncupTarget = new HTableDescriptor(t1_su);
fam = new HColumnDescriptor(famName);
t1_syncupTarget.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
t1_syncupTarget.addFamily(fam);
t2_syncupSource = new HTableDescriptor(t2_su);
fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
t2_syncupSource.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
t2_syncupSource.addFamily(fam);
t2_syncupTarget = new HTableDescriptor(t2_su);
fam = new HColumnDescriptor(famName);
t2_syncupTarget.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
t2_syncupTarget.addFamily(fam);
}
@After
public void tearDownBase() throws Exception {
// Do nothing, just replace the super tearDown. because the super tearDown will use the
// out-of-data HBase admin to remove replication peer, which will be result in failure.
}
/**
* Add a row to a table in each cluster, check it's replicated, delete it,
* check's gone Also check the puts and deletes are not replicated back to
* the originating cluster.
* Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
* the puts and deletes are not replicated back to the originating cluster.
*/
@Test
public void testSyncUpTool() throws Exception {
/**
* Set up Replication: on Master and one Slave
* Table: t1_syncup and t2_syncup
* columnfamily:
* 'cf1' : replicated
* 'norep': not replicated
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
* 'cf1' : replicated 'norep': not replicated
*/
setupReplication();
/**
* at Master:
* t1_syncup: put 100 rows into cf1, and 1 rows into norep
* t2_syncup: put 200 rows into cf1, and 1 rows into norep
*
* verify correctly replicated to slave
* at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
* into cf1, and 1 rows into norep verify correctly replicated to slave
*/
putAndReplicateRows();
/**
* Verify delete works
*
* step 1: stop hbase on Slave
*
* step 2: at Master:
* t1_syncup: delete 50 rows from cf1
* t2_syncup: delete 100 rows from cf1
* no change on 'norep'
*
* step 3: stop hbase on master, restart hbase on Slave
*
* step 4: verify Slave still have the rows before delete
* t1_syncup: 100 rows from cf1
* t2_syncup: 200 rows from cf1
*
* step 5: run syncup tool on Master
*
* step 6: verify that delete show up on Slave
* t1_syncup: 50 rows from cf1
* t2_syncup: 100 rows from cf1
*
* verify correctly replicated to Slave
* Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
* from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
* master, restart hbase on Slave step 4: verify Slave still have the rows before delete
* t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
* step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
* from cf1 verify correctly replicated to Slave
*/
mimicSyncUpAfterDelete();
/**
* Verify put works
*
* step 1: stop hbase on Slave
*
* step 2: at Master:
* t1_syncup: put 100 rows from cf1
* t2_syncup: put 200 rows from cf1
* and put another row on 'norep'
* ATTN: put to 'cf1' will overwrite existing rows, so end count will
* be 100 and 200 respectively
* put to 'norep' will add a new row.
*
* step 3: stop hbase on master, restart hbase on Slave
*
* step 4: verify Slave still has the rows before put
* t1_syncup: 50 rows from cf1
* t2_syncup: 100 rows from cf1
*
* step 5: run syncup tool on Master
*
* step 6: verify that put show up on Slave
* and 'norep' does not
* t1_syncup: 100 rows from cf1
* t2_syncup: 200 rows from cf1
*
* verify correctly replicated to Slave
* Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
* cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
* overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
* add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
* still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
* 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
* t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
* Slave
*/
mimicSyncUpAfterPut();
}
protected void setupReplication() throws Exception {
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
Admin ha = utility1.getAdmin();
ha.createTable(t1_syncupSource);
ha.createTable(t2_syncupSource);
ha.close();
ha = utility2.getAdmin();
ha.createTable(t1_syncupTarget);
ha.createTable(t2_syncupTarget);
ha.close();
Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration());
Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration());
// Get HTable from Master
ht1Source = connection1.getTable(t1_su);
ht2Source = connection1.getTable(t2_su);
// Get HTable from Peer1
ht1TargetAtPeer1 = connection2.getTable(t1_su);
ht2TargetAtPeer1 = connection2.getTable(t2_su);
/**
* set M-S : Master: utility1 Slave1: utility2
*/
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
admin1.addPeer("1", rpc, null);
admin1.close();
admin2.close();
}
private void putAndReplicateRows() throws Exception {
LOG.debug("putAndReplicateRows");
// add rows to Master cluster,
@ -233,46 +96,46 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
// 100 + 1 row to t1_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht1Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9999));
p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
ht1Source.put(p);
// 200 + 1 row to t2_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht2Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9999));
p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
ht2Source.put(p);
// ensure replication completed
Thread.sleep(SLEEP_TIME);
int rowCount_ht1Source = utility1.countRows(ht1Source);
int rowCountHt1Source = countRows(ht1Source);
for (int i = 0; i < NB_RETRIES; i++) {
int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
if (i==NB_RETRIES-1) {
assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
rowCount_ht1TargetAtPeer1);
int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
if (i == NB_RETRIES - 1) {
assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
rowCountHt1TargetAtPeer1);
}
if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
}
int rowCount_ht2Source = utility1.countRows(ht2Source);
int rowCountHt2Source = countRows(ht2Source);
for (int i = 0; i < NB_RETRIES; i++) {
int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
if (i==NB_RETRIES-1) {
assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
rowCount_ht2TargetAtPeer1);
int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
rowCountHt2TargetAtPeer1);
}
if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
@ -281,7 +144,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private void mimicSyncUpAfterDelete() throws Exception {
LOG.debug("mimicSyncUpAfterDelete");
utility2.shutdownMiniHBaseCluster();
UTIL2.shutdownMiniHBaseCluster();
List<Delete> list = new ArrayList<>();
// delete half of the rows
@ -299,51 +162,50 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
ht2Source.delete(list);
int rowCount_ht1Source = utility1.countRows(ht1Source);
int rowCount_ht1Source = countRows(ht1Source);
assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
rowCount_ht1Source);
int rowCount_ht2Source = utility1.countRows(ht2Source);
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
101, rowCount_ht2Source);
int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
rowCount_ht2Source);
utility1.shutdownMiniHBaseCluster();
utility2.restartHBaseCluster(1);
UTIL1.shutdownMiniHBaseCluster();
UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// before sync up
int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
// After sync up
for (int i = 0; i < NB_RETRIES; i++) {
syncUp(utility1);
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
syncUp(UTIL1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
// syncUP still failed. Let's look at the source in case anything wrong there
utility1.restartHBaseCluster(1);
rowCount_ht1Source = utility1.countRows(ht1Source);
UTIL1.restartHBaseCluster(1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = utility1.countRows(ht2Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
rowCount_ht1TargetAtPeer1);
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
rowCount_ht2TargetAtPeer1);
rowCountHt2TargetAtPeer1);
}
if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCount_ht2TargetAtPeer1);
LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
@ -351,82 +213,77 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private void mimicSyncUpAfterPut() throws Exception {
LOG.debug("mimicSyncUpAfterPut");
utility1.restartHBaseCluster(1);
utility2.shutdownMiniHBaseCluster();
UTIL1.restartHBaseCluster(1);
UTIL2.shutdownMiniHBaseCluster();
Put p;
// another 100 + 1 row to t1_syncup
// we should see 100 + 2 rows now
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht1Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9998));
p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
ht1Source.put(p);
// another 200 + 1 row to t1_syncup
// we should see 200 + 2 rows now
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht2Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9998));
p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
ht2Source.put(p);
int rowCount_ht1Source = utility1.countRows(ht1Source);
int rowCount_ht1Source = countRows(ht1Source);
assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
int rowCount_ht2Source = utility1.countRows(ht2Source);
int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
utility1.shutdownMiniHBaseCluster();
utility2.restartHBaseCluster(1);
UTIL1.shutdownMiniHBaseCluster();
UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// before sync up
int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
rowCount_ht1TargetAtPeer1);
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
rowCount_ht2TargetAtPeer1);
rowCountHt2TargetAtPeer1);
// after syun up
for (int i = 0; i < NB_RETRIES; i++) {
syncUp(utility1);
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
syncUp(UTIL1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
// syncUP still failed. Let's look at the source in case anything wrong there
utility1.restartHBaseCluster(1);
rowCount_ht1Source = utility1.countRows(ht1Source);
UTIL1.restartHBaseCluster(1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = utility1.countRows(ht2Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
rowCount_ht1TargetAtPeer1);
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
rowCount_ht2TargetAtPeer1);
rowCountHt2TargetAtPeer1);
}
if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
LOG.info("SyncUpAfterPut succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCount_ht2TargetAtPeer1);
LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
}
protected void syncUp(HBaseTestingUtility ut) throws Exception {
ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]);
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
public abstract class TestReplicationSyncUpToolBase {
protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
protected static final TableName TN1 = TableName.valueOf("t1_syncup");
protected static final TableName TN2 = TableName.valueOf("t2_syncup");
protected static final byte[] FAMILY = Bytes.toBytes("cf1");
protected static final byte[] QUALIFIER = Bytes.toBytes("q1");
protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep");
protected TableDescriptor t1SyncupSource;
protected TableDescriptor t1SyncupTarget;
protected TableDescriptor t2SyncupSource;
protected TableDescriptor t2SyncupTarget;
protected Connection conn1;
protected Connection conn2;
protected Table ht1Source;
protected Table ht2Source;
protected Table ht1TargetAtPeer1;
protected Table ht2TargetAtPeer1;
protected void customizeClusterConf(Configuration conf) {
}
@Before
public void setUp() throws Exception {
customizeClusterConf(UTIL1.getConfiguration());
customizeClusterConf(UTIL2.getConfiguration());
TestReplicationBase.configureClusters(UTIL1, UTIL2);
UTIL1.startMiniZKCluster();
UTIL2.setZkCluster(UTIL1.getZkCluster());
UTIL1.startMiniCluster(2);
UTIL2.startMiniCluster(4);
t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
}
@After
public void tearDown() throws Exception {
Closeables.close(ht1Source, true);
Closeables.close(ht2Source, true);
Closeables.close(ht1TargetAtPeer1, true);
Closeables.close(ht2TargetAtPeer1, true);
Closeables.close(conn1, true);
Closeables.close(conn2, true);
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}
protected final void setupReplication() throws Exception {
Admin admin1 = UTIL1.getAdmin();
admin1.createTable(t1SyncupSource);
admin1.createTable(t2SyncupSource);
Admin admin2 = UTIL2.getAdmin();
admin2.createTable(t1SyncupTarget);
admin2.createTable(t2SyncupTarget);
// Get HTable from Master
Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
ht1Source = conn1.getTable(TN1);
ht2Source = conn1.getTable(TN2);
// Get HTable from Peer1
Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
ht1TargetAtPeer1 = conn2.getTable(TN1);
ht2TargetAtPeer1 = conn2.getTable(TN2);
/**
* set M-S : Master: utility1 Slave1: utility2
*/
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
admin1.addReplicationPeer("1", rpc);
}
protected final void syncUp(HBaseTestingUtility util) throws Exception {
ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
}
}

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@ -26,6 +30,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -35,35 +40,34 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
private static final Logger LOG = LoggerFactory
.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf1.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
TestReplicationBase.setUpBeforeClass();
}
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
@Override
protected void customizeClusterConf(Configuration conf) {
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
}
@Test
public void testSyncUpTool() throws Exception {
/**
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
@ -77,7 +81,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
Iterator<String> randomHFileRangeListIterator = null;
Set<String> randomHFileRanges = new HashSet<>(16);
for (int i = 0; i < 16; i++) {
randomHFileRanges.add(utility1.getRandomUUID().toString());
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
@ -105,58 +109,58 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
throws Exception {
LOG.debug("mimicSyncUpAfterBulkLoad");
utility2.shutdownMiniHBaseCluster();
UTIL2.shutdownMiniHBaseCluster();
loadAndReplicateHFiles(false, randomHFileRangeListIterator);
int rowCount_ht1Source = utility1.countRows(ht1Source);
int rowCount_ht1Source = countRows(ht1Source);
assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
rowCount_ht1Source);
int rowCount_ht2Source = utility1.countRows(ht2Source);
int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
rowCount_ht2Source);
utility1.shutdownMiniHBaseCluster();
utility2.restartHBaseCluster(1);
UTIL1.shutdownMiniHBaseCluster();
UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// Before sync up
int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
// Run sync up tool
syncUp(utility1);
syncUp(UTIL1);
// After syun up
for (int i = 0; i < NB_RETRIES; i++) {
syncUp(utility1);
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
syncUp(UTIL1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
// syncUP still failed. Let's look at the source in case anything wrong there
utility1.restartHBaseCluster(1);
rowCount_ht1Source = utility1.countRows(ht1Source);
UTIL1.restartHBaseCluster(1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = utility1.countRows(ht2Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
rowCount_ht1TargetAtPeer1);
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
rowCount_ht2TargetAtPeer1);
rowCountHt2TargetAtPeer1);
}
if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCount_ht2TargetAtPeer1);
LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i +
", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 +
" and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
@ -168,44 +172,42 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
// Load 100 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
100);
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
hfileRanges, 3);
// Load 200 + 3 hfiles to t2_syncup.
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
200);
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
hfileRanges, 3);
if (verifyReplicationOnSlave) {
// ensure replication completed
wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
"t1_syncup has 103 rows on source, and 100 on slave1");
wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
"t2_syncup has 203 rows on source, and 200 on slave1");
}
}
private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
Path dir = utility1.getDataTestDirOnTestFS(testName);
FileSystem fs = utility1.getTestFileSystem();
Path dir = UTIL1.getDataTestDirOnTestFS(testName);
FileSystem fs = UTIL1.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));
@ -213,24 +215,23 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), fam, row, from, to, numOfRows);
HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}
final TableName tableName = source.getName();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
String[] args = { dir.toString(), tableName.toString() };
loader.run(args);
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
loader.bulkLoad(tableName, dir);
}
private void wait(Table target, int expectedCount, String msg) throws IOException,
InterruptedException {
private void wait(Table target, int expectedCount, String msg)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
int rowCountHt2TargetAtPeer1 = countRows(target);
if (i == NB_RETRIES - 1) {
assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
}
if (expectedCount == rowCount_ht2TargetAtPeer1) {
if (expectedCount == rowCountHt2TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);

View File

@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplication
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationEndpoint.setUpBeforeClass();
}
}

View File

@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationEndpoint.setUpBeforeClass();
}
}

View File

@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}

View File

@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}

View File

@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@ -35,10 +34,9 @@ public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicati
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleAsyncWAL.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationBase.setUpBeforeClass();
@Override
protected void customizeClusterConf(Configuration conf) {
conf.set(WALFactory.WAL_PROVIDER, "multiwal");
conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
}
}

View File

@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@ -33,12 +32,11 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class);
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationBase.setUpBeforeClass();
@Override
protected void customizeClusterConf(Configuration conf) {
conf.set(WALFactory.WAL_PROVIDER, "multiwal");
conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
}
}

View File

@ -59,19 +59,19 @@ public class TestReplicator extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Set RPC size limit to 10kb (will be applied to both source and sink clusters)
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void testReplicatorBatching() throws Exception {
// Clear the tables
truncateTable(utility1, tableName);
truncateTable(utility2, tableName);
truncateTable(UTIL1, tableName);
truncateTable(UTIL2, tableName);
// Replace the peer set up for us by the base class with a wrapper for this test
admin.addPeer("testReplicatorBatching",
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
null);
@ -92,7 +92,7 @@ public class TestReplicator extends TestReplicationBase {
}
// Wait for replication to complete.
Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
@ -107,7 +107,7 @@ public class TestReplicator extends TestReplicationBase {
assertEquals("We sent an incorrect number of batches", NUM_ROWS,
ReplicationEndpointForTest.getBatchCount());
assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
} finally {
admin.removePeer("testReplicatorBatching");
}
@ -116,12 +116,12 @@ public class TestReplicator extends TestReplicationBase {
@Test
public void testReplicatorWithErrors() throws Exception {
// Clear the tables
truncateTable(utility1, tableName);
truncateTable(utility2, tableName);
truncateTable(UTIL1, tableName);
truncateTable(UTIL2, tableName);
// Replace the peer set up for us by the base class with a wrapper for this test
admin.addPeer("testReplicatorWithErrors",
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
null);
@ -143,7 +143,7 @@ public class TestReplicator extends TestReplicationBase {
// Wait for replication to complete.
// We can expect 10 batches
Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
@ -155,7 +155,7 @@ public class TestReplicator extends TestReplicationBase {
}
});
assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
} finally {
admin.removePeer("testReplicatorWithErrors");
}