diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 6105a0d9007..28bf2491d1f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -26,7 +26,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -39,13 +38,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -57,10 +56,14 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; @@ -73,8 +76,8 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.mapreduce.Job; import org.junit.Before; import org.junit.Rule; @@ -162,7 +165,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable1.put(put); Get get = new Get(row); - get.setMaxVersions(); + get.readAllVersions(); for (int i = 0; i < NB_RETRIES; i++) { if (i==NB_RETRIES-1) { fail("Waited too much time for put replication"); @@ -184,7 +187,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable1.delete(d); get = new Get(row); - get.setMaxVersions(); + get.readAllVersions(); for (int i = 0; i < NB_RETRIES; i++) { if (i==NB_RETRIES-1) { fail("Waited too much time for put replication"); @@ -327,7 +330,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testDisableEnable() throws Exception { // Test disabling replication - admin.disablePeer(PEER_ID); + hbaseAdmin.disableReplicationPeer(PEER_ID); byte[] rowkey = Bytes.toBytes("disable enable"); Put put = new Put(rowkey); @@ -346,7 +349,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } // Test enable replication - admin.enablePeer(PEER_ID); + hbaseAdmin.enableReplicationPeer(PEER_ID); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); @@ -370,7 +373,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { @Test(timeout=300000) public void testAddAndRemoveClusters() throws Exception { LOG.info("testAddAndRemoveClusters"); - admin.removePeer(PEER_ID); + hbaseAdmin.removeReplicationPeer(PEER_ID); Thread.sleep(SLEEP_TIME); byte[] rowKey = Bytes.toBytes("Won't be replicated"); Put put = new Put(rowKey); @@ -392,7 +395,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); - admin.addPeer(PEER_ID, rpc, null); + hbaseAdmin.addReplicationPeer(PEER_ID, rpc); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); put = new Put(rowKey); @@ -525,13 +528,11 @@ public class TestReplicationSmallTests extends TestReplicationBase { Table lHtable2 = null; try { - HTableDescriptor table = new HTableDescriptor(tableName); - HColumnDescriptor fam = new HColumnDescriptor(familyname); - fam.setMaxVersions(100); - fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - table.addFamily(fam); + ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) + .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); + TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build(); scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (HColumnDescriptor f : table.getColumnFamilies()) { + for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { scopes.put(f.getName(), f.getScope()); } @@ -631,7 +632,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable1.put(put); Scan scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); ResultScanner scanner1 = htable1.getScanner(scan); Result[] res1 = scanner1.next(1); scanner1.close(); @@ -641,7 +642,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { for (int i = 0; i < NB_RETRIES; i++) { scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); scanner1 = htable2.getScanner(scan); res1 = scanner1.next(1); scanner1.close(); @@ -668,7 +669,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable2.put(put); scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); scanner1 = htable2.getScanner(scan); res1 = scanner1.next(NB_ROWS_IN_BATCH); scanner1.close(); @@ -695,7 +696,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable1.put(put); Scan scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); ResultScanner scanner1 = htable1.getScanner(scan); Result[] res1 = scanner1.next(1); scanner1.close(); @@ -705,7 +706,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { for (int i = 0; i < NB_RETRIES; i++) { scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); scanner1 = htable2.getScanner(scan); res1 = scanner1.next(1); scanner1.close(); @@ -728,13 +729,13 @@ public class TestReplicationSmallTests extends TestReplicationBase { try { // Disabling replication and modifying the particular version of the cell to validate the feature. - admin.disablePeer(PEER_ID); + hbaseAdmin.disableReplicationPeer(PEER_ID); Put put2 = new Put(Bytes.toBytes("r1")); put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); htable2.put(put2); scan = new Scan(); - scan.setMaxVersions(100); + scan.readVersions(100); scanner1 = htable2.getScanner(scan); res1 = scanner1.next(NB_ROWS_IN_BATCH); scanner1.close(); @@ -745,7 +746,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { runVerifyReplication(args, 0, 1); } finally { - admin.enablePeer(PEER_ID); + hbaseAdmin.enableReplicationPeer(PEER_ID); } } @@ -786,21 +787,20 @@ public class TestReplicationSmallTests extends TestReplicationBase { // Create Tables for (int i = 0; i < numOfTables; i++) { - HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i)); - HColumnDescriptor cfd = new HColumnDescriptor(colFam); - cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - ht.addFamily(cfd); - hadmin.createTable(ht); + hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam)) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build()); } // verify the result - List> replicationColFams = admin.listReplicated(); + List replicationColFams = hbaseAdmin.listReplicatedTableCFs(); int[] match = new int[numOfTables]; // array of 3 with init value of zero for (int i = 0; i < replicationColFams.size(); i++) { - HashMap replicationEntry = replicationColFams.get(i); - String tn = replicationEntry.get(ReplicationAdmin.TNAME); - if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) { + TableCFs replicationEntry = replicationColFams.get(i); + String tn = replicationEntry.getTable().getNameAsString(); + if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) { int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit match[m]++; // should only increase once } @@ -831,7 +831,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0); RegionInfo hri = region.getRegionInfo(); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) { + for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { scopes.put(fam, 1); } final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); @@ -918,14 +918,14 @@ public class TestReplicationSmallTests extends TestReplicationBase { Path rootDir = FSUtils.getRootDir(conf1); FileSystem fs = rootDir.getFileSystem(conf1); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, new String(famName), sourceSnapshotName, rootDir, fs, true); // Take target snapshot Path peerRootDir = FSUtils.getRootDir(conf2); FileSystem peerFs = peerRootDir.getFileSystem(conf2); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, new String(famName), peerSnapshotName, peerRootDir, peerFs, true); String peerFSAddress = peerFs.getUri().toString(); @@ -963,11 +963,11 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable2.delete(delete); sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, new String(famName), sourceSnapshotName, rootDir, fs, true); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); - SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, new String(famName), peerSnapshotName, peerRootDir, peerFs, true); args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, @@ -1006,27 +1006,23 @@ public class TestReplicationSmallTests extends TestReplicationBase { emptyWalPaths.add(emptyWalPath); } - // inject our empty wal into the replication queue + // inject our empty wal into the replication queue, and then roll the original wal, which + // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to + // determine if the file being replicated currently is still opened for write, so just inject a + // new wal to the replication queue does not mean the previous file is closed. for (int i = 0; i < numRs; i++) { - Replication replicationService = - (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + Replication replicationService = (Replication) hrs.getReplicationSourceService(); replicationService.preLogRoll(null, emptyWalPaths.get(i)); replicationService.postLogRoll(null, emptyWalPaths.get(i)); - } - - // wait for ReplicationSource to start reading from our empty wal - waitForLogAdvance(numRs, emptyWalPaths, false); - - // roll the original wal, which enqueues a new wal behind our empty wal - for (int i = 0; i < numRs; i++) { RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); - WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + WAL wal = hrs.getWAL(regionInfo); wal.rollWriter(true); } // ReplicationSource should advance past the empty wal, or else the test will fail - waitForLogAdvance(numRs, emptyWalPaths, true); + waitForLogAdvance(numRs); // we're now writing to the new wal // if everything works, the source should've stopped reading from the empty wal, and start @@ -1035,26 +1031,25 @@ public class TestReplicationSmallTests extends TestReplicationBase { } /** - * Waits for the ReplicationSource to start reading from the given paths + * Waits until there is only one log(the current writing one) in the replication queue * @param numRs number of regionservers - * @param emptyWalPaths path for each regionserver - * @param invert if true, waits until ReplicationSource is NOT reading from the given paths */ - private void waitForLogAdvance(final int numRs, final List emptyWalPaths, - final boolean invert) throws Exception { + private void waitForLogAdvance(int numRs) throws Exception { Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { + HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); + RegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = hrs.getWAL(regionInfo); + Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); Replication replicationService = (Replication) utility1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; - if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { - return false; - } - if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) { + if (!currentFile.equals(source.getCurrentPath())) { return false; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 87918ee82d8..206b5003649 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -63,7 +63,7 @@ public class TestReplicationBase { protected static ZooKeeperWatcher zkw2; protected static ReplicationAdmin admin; - private static Admin hbaseAdmin; + protected static Admin hbaseAdmin; protected static Table htable1; protected static Table htable2;