HBASE-18845 TestReplicationSmallTests fails after HBASE-14004

This commit is contained in:
zhangduo 2017-09-25 12:07:19 +08:00
parent 0658252ed6
commit 2e4c1b6288
2 changed files with 56 additions and 61 deletions

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -39,13 +38,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -57,10 +56,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
@ -73,8 +76,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -162,7 +165,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.put(put); htable1.put(put);
Get get = new Get(row); Get get = new Get(row);
get.setMaxVersions(); get.readAllVersions();
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) { if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication"); fail("Waited too much time for put replication");
@ -184,7 +187,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.delete(d); htable1.delete(d);
get = new Get(row); get = new Get(row);
get.setMaxVersions(); get.readAllVersions();
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) { if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication"); fail("Waited too much time for put replication");
@ -327,7 +330,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testDisableEnable() throws Exception { public void testDisableEnable() throws Exception {
// Test disabling replication // Test disabling replication
admin.disablePeer(PEER_ID); hbaseAdmin.disableReplicationPeer(PEER_ID);
byte[] rowkey = Bytes.toBytes("disable enable"); byte[] rowkey = Bytes.toBytes("disable enable");
Put put = new Put(rowkey); Put put = new Put(rowkey);
@ -346,7 +349,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
// Test enable replication // Test enable replication
admin.enablePeer(PEER_ID); hbaseAdmin.enableReplicationPeer(PEER_ID);
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get); Result res = htable2.get(get);
@ -370,7 +373,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
@Test(timeout=300000) @Test(timeout=300000)
public void testAddAndRemoveClusters() throws Exception { public void testAddAndRemoveClusters() throws Exception {
LOG.info("testAddAndRemoveClusters"); LOG.info("testAddAndRemoveClusters");
admin.removePeer(PEER_ID); hbaseAdmin.removeReplicationPeer(PEER_ID);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
byte[] rowKey = Bytes.toBytes("Won't be replicated"); byte[] rowKey = Bytes.toBytes("Won't be replicated");
Put put = new Put(rowKey); Put put = new Put(rowKey);
@ -392,7 +395,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey()); rpc.setClusterKey(utility2.getClusterKey());
admin.addPeer(PEER_ID, rpc, null); hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep"); rowKey = Bytes.toBytes("do rep");
put = new Put(rowKey); put = new Put(rowKey);
@ -525,13 +528,11 @@ public class TestReplicationSmallTests extends TestReplicationBase {
Table lHtable2 = null; Table lHtable2 = null;
try { try {
HTableDescriptor table = new HTableDescriptor(tableName); ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
HColumnDescriptor fam = new HColumnDescriptor(familyname); .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
fam.setMaxVersions(100); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build();
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor f : table.getColumnFamilies()) { for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
scopes.put(f.getName(), f.getScope()); scopes.put(f.getName(), f.getScope());
} }
@ -631,7 +632,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.put(put); htable1.put(put);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan); ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1); Result[] res1 = scanner1.next(1);
scanner1.close(); scanner1.close();
@ -641,7 +642,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan(); scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
scanner1 = htable2.getScanner(scan); scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1); res1 = scanner1.next(1);
scanner1.close(); scanner1.close();
@ -668,7 +669,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable2.put(put); htable2.put(put);
scan = new Scan(); scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
scanner1 = htable2.getScanner(scan); scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH); res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close(); scanner1.close();
@ -695,7 +696,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.put(put); htable1.put(put);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan); ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1); Result[] res1 = scanner1.next(1);
scanner1.close(); scanner1.close();
@ -705,7 +706,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan(); scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
scanner1 = htable2.getScanner(scan); scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1); res1 = scanner1.next(1);
scanner1.close(); scanner1.close();
@ -728,13 +729,13 @@ public class TestReplicationSmallTests extends TestReplicationBase {
try { try {
// Disabling replication and modifying the particular version of the cell to validate the feature. // Disabling replication and modifying the particular version of the cell to validate the feature.
admin.disablePeer(PEER_ID); hbaseAdmin.disableReplicationPeer(PEER_ID);
Put put2 = new Put(Bytes.toBytes("r1")); Put put2 = new Put(Bytes.toBytes("r1"));
put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
htable2.put(put2); htable2.put(put2);
scan = new Scan(); scan = new Scan();
scan.setMaxVersions(100); scan.readVersions(100);
scanner1 = htable2.getScanner(scan); scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH); res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close(); scanner1.close();
@ -745,7 +746,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
runVerifyReplication(args, 0, 1); runVerifyReplication(args, 0, 1);
} }
finally { finally {
admin.enablePeer(PEER_ID); hbaseAdmin.enableReplicationPeer(PEER_ID);
} }
} }
@ -786,21 +787,20 @@ public class TestReplicationSmallTests extends TestReplicationBase {
// Create Tables // Create Tables
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i)); hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i))
HColumnDescriptor cfd = new HColumnDescriptor(colFam); .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
ht.addFamily(cfd); .build());
hadmin.createTable(ht);
} }
// verify the result // verify the result
List<HashMap<String, String>> replicationColFams = admin.listReplicated(); List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
int[] match = new int[numOfTables]; // array of 3 with init value of zero int[] match = new int[numOfTables]; // array of 3 with init value of zero
for (int i = 0; i < replicationColFams.size(); i++) { for (int i = 0; i < replicationColFams.size(); i++) {
HashMap<String, String> replicationEntry = replicationColFams.get(i); TableCFs replicationEntry = replicationColFams.get(i);
String tn = replicationEntry.get(ReplicationAdmin.TNAME); String tn = replicationEntry.getTable().getNameAsString();
if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) { if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
match[m]++; // should only increase once match[m]++; // should only increase once
} }
@ -831,7 +831,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0); HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
RegionInfo hri = region.getRegionInfo(); RegionInfo hri = region.getRegionInfo();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) { for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
scopes.put(fam, 1); scopes.put(fam, 1);
} }
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@ -918,14 +918,14 @@ public class TestReplicationSmallTests extends TestReplicationBase {
Path rootDir = FSUtils.getRootDir(conf1); Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1); FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true); new String(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot // Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2); Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2); FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true); new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString(); String peerFSAddress = peerFs.getUri().toString();
@ -963,11 +963,11 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable2.delete(delete); htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true); new String(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true); new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
@ -1006,27 +1006,23 @@ public class TestReplicationSmallTests extends TestReplicationBase {
emptyWalPaths.add(emptyWalPath); emptyWalPaths.add(emptyWalPath);
} }
// inject our empty wal into the replication queue // inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
for (int i = 0; i < numRs; i++) { for (int i = 0; i < numRs; i++) {
Replication replicationService = HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
(Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); Replication replicationService = (Replication) hrs.getReplicationSourceService();
replicationService.preLogRoll(null, emptyWalPaths.get(i)); replicationService.preLogRoll(null, emptyWalPaths.get(i));
replicationService.postLogRoll(null, emptyWalPaths.get(i)); replicationService.postLogRoll(null, emptyWalPaths.get(i));
}
// wait for ReplicationSource to start reading from our empty wal
waitForLogAdvance(numRs, emptyWalPaths, false);
// roll the original wal, which enqueues a new wal behind our empty wal
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo = RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true); wal.rollWriter(true);
} }
// ReplicationSource should advance past the empty wal, or else the test will fail // ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs, emptyWalPaths, true); waitForLogAdvance(numRs);
// we're now writing to the new wal // we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start // if everything works, the source should've stopped reading from the empty wal, and start
@ -1035,26 +1031,25 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
/** /**
* Waits for the ReplicationSource to start reading from the given paths * Waits until there is only one log(the current writing one) in the replication queue
* @param numRs number of regionservers * @param numRs number of regionservers
* @param emptyWalPaths path for each regionserver
* @param invert if true, waits until ReplicationSource is NOT reading from the given paths
*/ */
private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths, private void waitForLogAdvance(int numRs) throws Exception {
final boolean invert) throws Exception {
Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() { Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) { for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) utility1.getHBaseCluster() Replication replicationService = (Replication) utility1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService(); .getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) { .getSources()) {
ReplicationSource source = (ReplicationSource) rsi; ReplicationSource source = (ReplicationSource) rsi;
if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { if (!currentFile.equals(source.getCurrentPath())) {
return false;
}
if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
return false; return false;
} }
} }

View File

@ -63,7 +63,7 @@ public class TestReplicationBase {
protected static ZooKeeperWatcher zkw2; protected static ZooKeeperWatcher zkw2;
protected static ReplicationAdmin admin; protected static ReplicationAdmin admin;
private static Admin hbaseAdmin; protected static Admin hbaseAdmin;
protected static Table htable1; protected static Table htable1;
protected static Table htable2; protected static Table htable2;