diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index a76bb11515d..10a1b0634f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -34,10 +36,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.exceptions.SnapshotCreationException; import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -56,6 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; @@ -79,6 +85,7 @@ public class TestFlushSnapshotFromClient { private static final int NUM_RS = 2; private static final String STRING_TABLE_NAME = "test"; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + private static final byte[] TEST_QUAL = Bytes.toBytes("q"); private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); /** @@ -106,7 +113,7 @@ public class TestFlushSnapshotFromClient { // block writes if we get to 12 store files conf.setInt("hbase.hstore.blockingStoreFiles", 12); // drop the number of attempts for the hbase admin - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // Enable snapshot conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, @@ -115,7 +122,8 @@ public class TestFlushSnapshotFromClient { @Before public void setup() throws Exception { - UTIL.createTable(TABLE_NAME, TEST_FAM); + createTable(TABLE_NAME, TEST_FAM); + assertTrue(UTIL.getHBaseAdmin().getTableRegions(TABLE_NAME).size() > 2); } @After @@ -151,7 +159,7 @@ public class TestFlushSnapshotFromClient { // put some stuff in the table HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - UTIL.loadTable(table, TEST_FAM); + loadData(table, 10000, TEST_FAM); // get the name of all the regionservers hosting the snapshotted table Set snapshotServers = new HashSet(); @@ -204,7 +212,7 @@ public class TestFlushSnapshotFromClient { try { admin.getTableDescriptor(Bytes.toBytes(tableName)); fail = true; - LOG.error("Table:" + tableName + " already exists, checking a new name"); + LOG.error("Table:" + tableName + " already exists, checking a new name"); tableName = tableName+"!"; } catch (TableNotFoundException e) { fail = false; @@ -249,6 +257,112 @@ public class TestFlushSnapshotFromClient { } + @Test + public void testSnapshotStateAfterMerge() throws Exception { + int numRows = 10000; + HBaseAdmin admin = UTIL.getHBaseAdmin(); + // make sure we don't fail on listing snapshots + SnapshotTestingUtils.assertNoSnapshots(admin); + // load the table so we have some data + loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), numRows, TEST_FAM); + // and wait until everything stabilizes + waitForTableToBeOnline(TABLE_NAME); + + // Take a snapshot + String snapshotBeforeMergeName = "snapshotBeforeMerge"; + admin.snapshot(snapshotBeforeMergeName, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH); + + // Clone the table + String cloneBeforeMergeName = "cloneBeforeMerge"; + admin.cloneSnapshot(snapshotBeforeMergeName, cloneBeforeMergeName); + waitForTableToBeOnline(Bytes.toBytes(cloneBeforeMergeName)); + + // Merge two regions + List regions = admin.getTableRegions(TABLE_NAME); + Collections.sort(regions, new Comparator() { + public int compare(HRegionInfo r1, HRegionInfo r2) { + return Bytes.compareTo(r1.getStartKey(), r2.getStartKey()); + } + }); + + int numRegions = admin.getTableRegions(TABLE_NAME).size(); + int numRegionsAfterMerge = numRegions - 2; + admin.mergeRegions(regions.get(1).getEncodedNameAsBytes(), + regions.get(2).getEncodedNameAsBytes(), true); + admin.mergeRegions(regions.get(5).getEncodedNameAsBytes(), + regions.get(6).getEncodedNameAsBytes(), true); + + // Verify that there's one region less + waitRegionsAfterMerge(numRegionsAfterMerge); + assertEquals(numRegionsAfterMerge, admin.getTableRegions(TABLE_NAME).size()); + + // Clone the table + String cloneAfterMergeName = "cloneAfterMerge"; + admin.cloneSnapshot(snapshotBeforeMergeName, cloneAfterMergeName); + waitForTableToBeOnline(Bytes.toBytes(cloneAfterMergeName)); + + verifyRowCount(TABLE_NAME, numRows); + verifyRowCount(Bytes.toBytes(cloneBeforeMergeName), numRows); + verifyRowCount(Bytes.toBytes(cloneAfterMergeName), numRows); + + // test that we can delete the snapshot + UTIL.deleteTable(cloneAfterMergeName); + UTIL.deleteTable(cloneBeforeMergeName); + admin.deleteSnapshot(snapshotBeforeMergeName); + + // make sure we don't have any snapshots + SnapshotTestingUtils.assertNoSnapshots(admin); + } + + @Test + public void testTakeSnapshotAfterMerge() throws Exception { + int numRows = 10000; + HBaseAdmin admin = UTIL.getHBaseAdmin(); + // make sure we don't fail on listing snapshots + SnapshotTestingUtils.assertNoSnapshots(admin); + // load the table so we have some data + loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), numRows, TEST_FAM); + // and wait until everything stabilizes + waitForTableToBeOnline(TABLE_NAME); + + // Merge two regions + List regions = admin.getTableRegions(TABLE_NAME); + Collections.sort(regions, new Comparator() { + public int compare(HRegionInfo r1, HRegionInfo r2) { + return Bytes.compareTo(r1.getStartKey(), r2.getStartKey()); + } + }); + + int numRegions = admin.getTableRegions(TABLE_NAME).size(); + int numRegionsAfterMerge = numRegions - 2; + admin.mergeRegions(regions.get(1).getEncodedNameAsBytes(), + regions.get(2).getEncodedNameAsBytes(), true); + admin.mergeRegions(regions.get(5).getEncodedNameAsBytes(), + regions.get(6).getEncodedNameAsBytes(), true); + + waitRegionsAfterMerge(numRegionsAfterMerge); + assertEquals(numRegionsAfterMerge, admin.getTableRegions(TABLE_NAME).size()); + + // Take a snapshot + String snapshotName = "snapshotAfterMerge"; + admin.snapshot(snapshotName, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH); + + // Clone the table + String cloneName = "cloneMerge"; + admin.cloneSnapshot(snapshotName, cloneName); + waitForTableToBeOnline(Bytes.toBytes(cloneName)); + + verifyRowCount(TABLE_NAME, numRows); + verifyRowCount(Bytes.toBytes(cloneName), numRows); + + // test that we can delete the snapshot + UTIL.deleteTable(cloneName); + admin.deleteSnapshot(snapshotName); + + // make sure we don't have any snapshots + SnapshotTestingUtils.assertNoSnapshots(admin); + } + /** * Basic end-to-end test of simple-flush-based snapshots */ @@ -259,7 +373,7 @@ public class TestFlushSnapshotFromClient { // make sure we don't fail on listing snapshots SnapshotTestingUtils.assertNoSnapshots(admin); // load the table so we have some data - UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM); + loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), 10000, TEST_FAM); // and wait until everything stabilizes waitForTableToBeOnline(TABLE_NAME); @@ -289,6 +403,7 @@ public class TestFlushSnapshotFromClient { // check the region snapshot for all the regions List regions = admin.getTableRegions(TABLE_NAME); + assertTrue(regions.size() > 1); for (HRegionInfo info : regions) { String regionName = info.getEncodedName(); Path regionDir = new Path(snapshotDir, regionName); @@ -297,6 +412,7 @@ public class TestFlushSnapshotFromClient { // check to make sure we have the family Path familyDir = new Path(regionDir, Bytes.toString(TEST_FAM)); assertTrue(fs.exists(familyDir)); + // make sure we have some file references assertTrue(fs.listStatus(familyDir).length > 0); } @@ -326,10 +442,10 @@ public class TestFlushSnapshotFromClient { // make sure we don't fail on listing snapshots SnapshotTestingUtils.assertNoSnapshots(admin); // create second testing table - UTIL.createTable(TABLE2_NAME, TEST_FAM); + createTable(TABLE2_NAME, TEST_FAM); // load the table so we have some data - UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM); - UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE2_NAME), TEST_FAM); + loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), 10000, TEST_FAM); + loadData(new HTable(UTIL.getConfiguration(), TABLE2_NAME), 10000, TEST_FAM); // and wait until everything stabilizes waitForTableToBeOnline(TABLE_NAME); waitForTableToBeOnline(TABLE2_NAME); @@ -435,5 +551,70 @@ public class TestFlushSnapshotFromClient { for (HRegion region : onlineRegions) { region.waitForFlushesAndCompactions(); } + UTIL.getHBaseAdmin().isTableAvailable(tableName); + } + + private void waitRegionsAfterMerge(final long numRegionsAfterMerge) + throws IOException, InterruptedException { + HBaseAdmin admin = UTIL.getHBaseAdmin(); + // Verify that there's one region less + long startTime = System.currentTimeMillis(); + while (admin.getTableRegions(TABLE_NAME).size() != numRegionsAfterMerge) { + // This may be flaky... if after 15sec the merge is not complete give up + // it will fail in the assertEquals(numRegionsAfterMerge). + if ((System.currentTimeMillis() - startTime) > 15000) + break; + Thread.sleep(100); + } + waitForTableToBeOnline(TABLE_NAME); + } + + private void createTable(final byte[] tableName, final byte[]... families) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (byte[] family: families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + htd.addFamily(hcd); + } + byte[][] splitKeys = new byte[14][]; + byte[] hex = Bytes.toBytes("123456789abcde"); + for (int i = 0; i < splitKeys.length; ++i) { + splitKeys[i] = new byte[] { hex[i] }; + } + UTIL.getHBaseAdmin().createTable(htd, splitKeys); + } + + public void loadData(final HTable table, int rows, byte[]... families) throws IOException { + table.setAutoFlush(false); + + assertTrue(rows >= 16); + for (char k: "0123456789abcdef".toCharArray()) { + byte[] value = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(System.currentTimeMillis())); + byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); + putData(table, families, key, value); + rows--; + } + + while (rows-- > 0) { + byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); + byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); + putData(table, families, key, value); + } + table.flushCommits(); + } + + private void putData(final HTable table, final byte[][] families, + final byte[] key, final byte[] value) throws IOException { + Put put = new Put(key); + put.setDurability(Durability.SKIP_WAL); + for (byte[] family: families) { + put.add(family, TEST_QUAL, value); + } + table.put(put); + } + + private void verifyRowCount(final byte[] tableName, long expectedRows) throws IOException { + HTable table = new HTable(UTIL.getConfiguration(), tableName); + assertEquals(expectedRows, UTIL.countRows(table)); + table.close(); } }