HBASE-8984 Test online snapshots with online merge

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1505790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2013-07-22 20:14:54 +00:00
parent 53ed511717
commit a8d6626c67
1 changed files with 189 additions and 8 deletions

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -34,10 +36,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.ipc.RpcClient;
@ -56,6 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
@ -79,6 +85,7 @@ public class TestFlushSnapshotFromClient {
private static final int NUM_RS = 2;
private static final String STRING_TABLE_NAME = "test";
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static final byte[] TEST_QUAL = Bytes.toBytes("q");
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
/**
@ -106,7 +113,7 @@ public class TestFlushSnapshotFromClient {
// block writes if we get to 12 store files
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
// drop the number of attempts for the hbase admin
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
// Enable snapshot
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
@ -115,7 +122,8 @@ public class TestFlushSnapshotFromClient {
@Before
public void setup() throws Exception {
UTIL.createTable(TABLE_NAME, TEST_FAM);
createTable(TABLE_NAME, TEST_FAM);
assertTrue(UTIL.getHBaseAdmin().getTableRegions(TABLE_NAME).size() > 2);
}
@After
@ -151,7 +159,7 @@ public class TestFlushSnapshotFromClient {
// put some stuff in the table
HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
UTIL.loadTable(table, TEST_FAM);
loadData(table, 10000, TEST_FAM);
// get the name of all the regionservers hosting the snapshotted table
Set<String> snapshotServers = new HashSet<String>();
@ -249,6 +257,112 @@ public class TestFlushSnapshotFromClient {
}
@Test
public void testSnapshotStateAfterMerge() throws Exception {
int numRows = 10000;
HBaseAdmin admin = UTIL.getHBaseAdmin();
// make sure we don't fail on listing snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
// load the table so we have some data
loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), numRows, TEST_FAM);
// and wait until everything stabilizes
waitForTableToBeOnline(TABLE_NAME);
// Take a snapshot
String snapshotBeforeMergeName = "snapshotBeforeMerge";
admin.snapshot(snapshotBeforeMergeName, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH);
// Clone the table
String cloneBeforeMergeName = "cloneBeforeMerge";
admin.cloneSnapshot(snapshotBeforeMergeName, cloneBeforeMergeName);
waitForTableToBeOnline(Bytes.toBytes(cloneBeforeMergeName));
// Merge two regions
List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
Collections.sort(regions, new Comparator<HRegionInfo>() {
public int compare(HRegionInfo r1, HRegionInfo r2) {
return Bytes.compareTo(r1.getStartKey(), r2.getStartKey());
}
});
int numRegions = admin.getTableRegions(TABLE_NAME).size();
int numRegionsAfterMerge = numRegions - 2;
admin.mergeRegions(regions.get(1).getEncodedNameAsBytes(),
regions.get(2).getEncodedNameAsBytes(), true);
admin.mergeRegions(regions.get(5).getEncodedNameAsBytes(),
regions.get(6).getEncodedNameAsBytes(), true);
// Verify that there's one region less
waitRegionsAfterMerge(numRegionsAfterMerge);
assertEquals(numRegionsAfterMerge, admin.getTableRegions(TABLE_NAME).size());
// Clone the table
String cloneAfterMergeName = "cloneAfterMerge";
admin.cloneSnapshot(snapshotBeforeMergeName, cloneAfterMergeName);
waitForTableToBeOnline(Bytes.toBytes(cloneAfterMergeName));
verifyRowCount(TABLE_NAME, numRows);
verifyRowCount(Bytes.toBytes(cloneBeforeMergeName), numRows);
verifyRowCount(Bytes.toBytes(cloneAfterMergeName), numRows);
// test that we can delete the snapshot
UTIL.deleteTable(cloneAfterMergeName);
UTIL.deleteTable(cloneBeforeMergeName);
admin.deleteSnapshot(snapshotBeforeMergeName);
// make sure we don't have any snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
}
@Test
public void testTakeSnapshotAfterMerge() throws Exception {
int numRows = 10000;
HBaseAdmin admin = UTIL.getHBaseAdmin();
// make sure we don't fail on listing snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
// load the table so we have some data
loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), numRows, TEST_FAM);
// and wait until everything stabilizes
waitForTableToBeOnline(TABLE_NAME);
// Merge two regions
List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
Collections.sort(regions, new Comparator<HRegionInfo>() {
public int compare(HRegionInfo r1, HRegionInfo r2) {
return Bytes.compareTo(r1.getStartKey(), r2.getStartKey());
}
});
int numRegions = admin.getTableRegions(TABLE_NAME).size();
int numRegionsAfterMerge = numRegions - 2;
admin.mergeRegions(regions.get(1).getEncodedNameAsBytes(),
regions.get(2).getEncodedNameAsBytes(), true);
admin.mergeRegions(regions.get(5).getEncodedNameAsBytes(),
regions.get(6).getEncodedNameAsBytes(), true);
waitRegionsAfterMerge(numRegionsAfterMerge);
assertEquals(numRegionsAfterMerge, admin.getTableRegions(TABLE_NAME).size());
// Take a snapshot
String snapshotName = "snapshotAfterMerge";
admin.snapshot(snapshotName, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH);
// Clone the table
String cloneName = "cloneMerge";
admin.cloneSnapshot(snapshotName, cloneName);
waitForTableToBeOnline(Bytes.toBytes(cloneName));
verifyRowCount(TABLE_NAME, numRows);
verifyRowCount(Bytes.toBytes(cloneName), numRows);
// test that we can delete the snapshot
UTIL.deleteTable(cloneName);
admin.deleteSnapshot(snapshotName);
// make sure we don't have any snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
}
/**
* Basic end-to-end test of simple-flush-based snapshots
*/
@ -259,7 +373,7 @@ public class TestFlushSnapshotFromClient {
// make sure we don't fail on listing snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
// load the table so we have some data
UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM);
loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), 10000, TEST_FAM);
// and wait until everything stabilizes
waitForTableToBeOnline(TABLE_NAME);
@ -289,6 +403,7 @@ public class TestFlushSnapshotFromClient {
// check the region snapshot for all the regions
List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
assertTrue(regions.size() > 1);
for (HRegionInfo info : regions) {
String regionName = info.getEncodedName();
Path regionDir = new Path(snapshotDir, regionName);
@ -297,6 +412,7 @@ public class TestFlushSnapshotFromClient {
// check to make sure we have the family
Path familyDir = new Path(regionDir, Bytes.toString(TEST_FAM));
assertTrue(fs.exists(familyDir));
// make sure we have some file references
assertTrue(fs.listStatus(familyDir).length > 0);
}
@ -326,10 +442,10 @@ public class TestFlushSnapshotFromClient {
// make sure we don't fail on listing snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
// create second testing table
UTIL.createTable(TABLE2_NAME, TEST_FAM);
createTable(TABLE2_NAME, TEST_FAM);
// load the table so we have some data
UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM);
UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE2_NAME), TEST_FAM);
loadData(new HTable(UTIL.getConfiguration(), TABLE_NAME), 10000, TEST_FAM);
loadData(new HTable(UTIL.getConfiguration(), TABLE2_NAME), 10000, TEST_FAM);
// and wait until everything stabilizes
waitForTableToBeOnline(TABLE_NAME);
waitForTableToBeOnline(TABLE2_NAME);
@ -435,5 +551,70 @@ public class TestFlushSnapshotFromClient {
for (HRegion region : onlineRegions) {
region.waitForFlushesAndCompactions();
}
UTIL.getHBaseAdmin().isTableAvailable(tableName);
}
private void waitRegionsAfterMerge(final long numRegionsAfterMerge)
throws IOException, InterruptedException {
HBaseAdmin admin = UTIL.getHBaseAdmin();
// Verify that there's one region less
long startTime = System.currentTimeMillis();
while (admin.getTableRegions(TABLE_NAME).size() != numRegionsAfterMerge) {
// This may be flaky... if after 15sec the merge is not complete give up
// it will fail in the assertEquals(numRegionsAfterMerge).
if ((System.currentTimeMillis() - startTime) > 15000)
break;
Thread.sleep(100);
}
waitForTableToBeOnline(TABLE_NAME);
}
private void createTable(final byte[] tableName, final byte[]... families) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for (byte[] family: families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
htd.addFamily(hcd);
}
byte[][] splitKeys = new byte[14][];
byte[] hex = Bytes.toBytes("123456789abcde");
for (int i = 0; i < splitKeys.length; ++i) {
splitKeys[i] = new byte[] { hex[i] };
}
UTIL.getHBaseAdmin().createTable(htd, splitKeys);
}
public void loadData(final HTable table, int rows, byte[]... families) throws IOException {
table.setAutoFlush(false);
assertTrue(rows >= 16);
for (char k: "0123456789abcdef".toCharArray()) {
byte[] value = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(System.currentTimeMillis()));
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
putData(table, families, key, value);
rows--;
}
while (rows-- > 0) {
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
putData(table, families, key, value);
}
table.flushCommits();
}
private void putData(final HTable table, final byte[][] families,
final byte[] key, final byte[] value) throws IOException {
Put put = new Put(key);
put.setDurability(Durability.SKIP_WAL);
for (byte[] family: families) {
put.add(family, TEST_QUAL, value);
}
table.put(put);
}
private void verifyRowCount(final byte[] tableName, long expectedRows) throws IOException {
HTable table = new HTable(UTIL.getConfiguration(), tableName);
assertEquals(expectedRows, UTIL.countRows(table));
table.close();
}
}