HBASE-12718 Convert TestAcidGuarantees from a unit test to an integration test

This commit is contained in:
Jonathan M Hsieh 2014-12-17 16:57:27 -08:00
parent c1eacd6221
commit 4a6f1d9d6b
1 changed files with 94 additions and 122 deletions

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,89 +17,100 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException; import com.google.common.collect.Lists;
import java.util.List; import com.google.common.collect.Sets;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists; import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Test case that uses multiple threads to read and write multifamily rows * This Integration Test verifies acid guarantees across column families by frequently writing
* into a table, verifying that reads never see partially-complete writes. * values to rows with multiple column families and concurrently reading entire rows that expect all
* * column families.
* This can run as a junit test, or with a main() function which runs against
* a real cluster (eg for testing with failures, region movement, etc)
*/ */
@Category(MediumTests.class) @Category(IntegrationTests.class)
public class TestAcidGuarantees implements Tool { public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees"); protected static final Log LOG = LogFactory.getLog(IntegrationTestAcidGuarantees.class);
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
public static final TableName TABLE_NAME =
TableName.valueOf(IntegrationTestAcidGuarantees.class.getSimpleName());
public static final byte [] FAMILY_A = Bytes.toBytes("A"); public static final byte [] FAMILY_A = Bytes.toBytes("A");
public static final byte [] FAMILY_B = Bytes.toBytes("B"); public static final byte [] FAMILY_B = Bytes.toBytes("B");
public static final byte [] FAMILY_C = Bytes.toBytes("C"); public static final byte [] FAMILY_C = Bytes.toBytes("C");
public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
public static final byte[][] FAMILIES = new byte[][] { public static final byte[][] FAMILIES = new byte[][] {
FAMILY_A, FAMILY_B, FAMILY_C }; FAMILY_A, FAMILY_B, FAMILY_C };
private HBaseTestingUtility util;
public static int NUM_COLS_TO_CHECK = 50; public static int NUM_COLS_TO_CHECK = 50;
// when run as main protected IntegrationTestingUtility util;
private Configuration conf;
private void createTableIfMissing() // **** extends IntegrationTestBase
throws IOException {
try { @Override
util.createTable(TABLE_NAME, FAMILIES); public int runTestFromCommandLine() throws Exception {
} catch (TableExistsException tee) { Configuration c = getConf();
} int millis = c.getInt("millis", 5000);
int numWriters = c.getInt("numWriters", 50);
int numGetters = c.getInt("numGetters", 2);
int numScanners = c.getInt("numScanners", 2);
int numUniqueRows = c.getInt("numUniqueRows", 3);
runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
return 0;
} }
public TestAcidGuarantees() { @Override
public void setUpCluster() throws Exception {
// Set small flush size for minicluster so we exercise reseeking scanners // Set small flush size for minicluster so we exercise reseeking scanners
Configuration conf = HBaseConfiguration.create(); util = getTestingUtil(getConf());
util.initializeCluster(SERVER_COUNT);
conf = getConf();
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024)); conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
// prevent aggressive region split // prevent aggressive region split
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName()); ConstantSizeRegionSplitPolicy.class.getName());
util = new HBaseTestingUtility(conf); this.setConf(util.getConfiguration());
} }
@Override
public TableName getTablename() {
return TABLE_NAME;
}
@Override
protected Set<String> getColumnFamilies() {
return Sets.newHashSet(String.valueOf(FAMILY_A), String.valueOf(FAMILY_B),
String.valueOf(FAMILY_C));
}
// **** core
/** /**
* Thread that does random full-row writes into a table. * Thread that does random full-row writes into a table.
*/ */
public static class AtomicityWriter extends RepeatingTestThread { public static class AtomicityWriter extends MultithreadedTestUtil.RepeatingTestThread {
Random rand = new Random(); Random rand = new Random();
byte data[] = new byte[10]; byte data[] = new byte[10];
byte targetRows[][]; byte targetRows[][];
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; Table table;
AtomicLong numWritten = new AtomicLong(); AtomicLong numWritten = new AtomicLong();
public AtomicityWriter(TestContext ctx, byte targetRows[][], public AtomicityWriter(MultithreadedTestUtil.TestContext ctx, byte targetRows[][],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][]) throws IOException {
super(ctx); super(ctx);
this.targetRows = targetRows; this.targetRows = targetRows;
@ -128,14 +138,14 @@ public class TestAcidGuarantees implements Tool {
* Thread that does single-row reads in a table, looking for partially * Thread that does single-row reads in a table, looking for partially
* completed rows. * completed rows.
*/ */
public static class AtomicGetReader extends RepeatingTestThread { public static class AtomicGetReader extends MultithreadedTestUtil.RepeatingTestThread {
byte targetRow[]; byte targetRow[];
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; Table table;
int numVerified = 0; int numVerified = 0;
AtomicLong numRead = new AtomicLong(); AtomicLong numRead = new AtomicLong();
public AtomicGetReader(TestContext ctx, byte targetRow[], public AtomicGetReader(MultithreadedTestUtil.TestContext ctx, byte targetRow[],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][]) throws IOException {
super(ctx); super(ctx);
this.targetRow = targetRow; this.targetRow = targetRow;
@ -187,14 +197,14 @@ public class TestAcidGuarantees implements Tool {
* Thread that does full scans of the table looking for any partially completed * Thread that does full scans of the table looking for any partially completed
* rows. * rows.
*/ */
public static class AtomicScanReader extends RepeatingTestThread { public static class AtomicScanReader extends MultithreadedTestUtil.RepeatingTestThread {
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; Table table;
AtomicLong numScans = new AtomicLong(); AtomicLong numScans = new AtomicLong();
AtomicLong numRowsScanned = new AtomicLong(); AtomicLong numRowsScanned = new AtomicLong();
public AtomicScanReader(TestContext ctx, public AtomicScanReader(MultithreadedTestUtil.TestContext ctx,
byte targetFamilies[][]) throws IOException { byte targetFamilies[][]) throws IOException {
super(ctx); super(ctx);
this.targetFamilies = targetFamilies; this.targetFamilies = targetFamilies;
table = new HTable(ctx.getConf(), TABLE_NAME); table = new HTable(ctx.getConf(), TABLE_NAME);
@ -241,21 +251,29 @@ public class TestAcidGuarantees implements Tool {
} }
public void runTestAtomicity(long millisToRun, public void runTestAtomicity(long millisToRun,
int numWriters, int numWriters,
int numGetters, int numGetters,
int numScanners, int numScanners,
int numUniqueRows) throws Exception { int numUniqueRows) throws Exception {
runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false); runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
} }
private void createTableIfMissing()
throws IOException {
try {
util.createTable(TABLE_NAME, FAMILIES);
} catch (TableExistsException tee) {
}
}
public void runTestAtomicity(long millisToRun, public void runTestAtomicity(long millisToRun,
int numWriters, int numWriters,
int numGetters, int numGetters,
int numScanners, int numScanners,
int numUniqueRows, int numUniqueRows,
final boolean systemTest) throws Exception { final boolean systemTest) throws Exception {
createTableIfMissing(); createTableIfMissing();
TestContext ctx = new TestContext(util.getConfiguration()); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(util.getConfiguration());
byte rows[][] = new byte[numUniqueRows][]; byte rows[][] = new byte[numUniqueRows][];
for (int i = 0; i < numUniqueRows; i++) { for (int i = 0; i < numUniqueRows; i++) {
@ -265,13 +283,13 @@ public class TestAcidGuarantees implements Tool {
List<AtomicityWriter> writers = Lists.newArrayList(); List<AtomicityWriter> writers = Lists.newArrayList();
for (int i = 0; i < numWriters; i++) { for (int i = 0; i < numWriters; i++) {
AtomicityWriter writer = new AtomicityWriter( AtomicityWriter writer = new AtomicityWriter(
ctx, rows, FAMILIES); ctx, rows, FAMILIES);
writers.add(writer); writers.add(writer);
ctx.addThread(writer); ctx.addThread(writer);
} }
// Add a flusher // Add a flusher
ctx.addThread(new RepeatingTestThread(ctx) { ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); Admin admin = util.getHBaseAdmin();
public void doAnAction() throws Exception { public void doAnAction() throws Exception {
try { try {
admin.flush(TABLE_NAME); admin.flush(TABLE_NAME);
@ -293,7 +311,7 @@ public class TestAcidGuarantees implements Tool {
List<AtomicGetReader> getters = Lists.newArrayList(); List<AtomicGetReader> getters = Lists.newArrayList();
for (int i = 0; i < numGetters; i++) { for (int i = 0; i < numGetters; i++) {
AtomicGetReader getter = new AtomicGetReader( AtomicGetReader getter = new AtomicGetReader(
ctx, rows[i % numUniqueRows], FAMILIES); ctx, rows[i % numUniqueRows], FAMILIES);
getters.add(getter); getters.add(getter);
ctx.addThread(getter); ctx.addThread(getter);
} }
@ -326,73 +344,27 @@ public class TestAcidGuarantees implements Tool {
@Test @Test
public void testGetAtomicity() throws Exception { public void testGetAtomicity() throws Exception {
util.startMiniCluster(1); runTestAtomicity(20000, 5, 5, 0, 3);
try {
runTestAtomicity(20000, 5, 5, 0, 3);
} finally {
util.shutdownMiniCluster();
}
} }
@Test @Test
public void testScanAtomicity() throws Exception { public void testScanAtomicity() throws Exception {
util.startMiniCluster(1); runTestAtomicity(20000, 5, 0, 5, 3);
try {
runTestAtomicity(20000, 5, 0, 5, 3);
} finally {
util.shutdownMiniCluster();
}
} }
@Test @Test
public void testMixedAtomicity() throws Exception { public void testMixedAtomicity() throws Exception {
util.startMiniCluster(1); runTestAtomicity(20000, 5, 2, 2, 3);
try {
runTestAtomicity(20000, 5, 2, 2, 3);
} finally {
util.shutdownMiniCluster();
}
} }
//////////////////////////////////////////////////////////////////////////// public static void main(String[] args) throws Exception {
// Tool interface Configuration conf = HBaseConfiguration.create();
//////////////////////////////////////////////////////////////////////////// IntegrationTestingUtility.setUseDistributedCluster(conf);
@Override int ret = ToolRunner.run(conf, new IntegrationTestAcidGuarantees(), args);
public Configuration getConf() { System.exit(ret);
return conf;
}
@Override
public void setConf(Configuration c) {
this.conf = c;
this.util = new HBaseTestingUtility(c);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration c = getConf();
int millis = c.getInt("millis", 5000);
int numWriters = c.getInt("numWriters", 50);
int numGetters = c.getInt("numGetters", 2);
int numScanners = c.getInt("numScanners", 2);
int numUniqueRows = c.getInt("numUniqueRows", 3);
runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
return 0;
}
public static void main(String args[]) throws Exception {
Configuration c = HBaseConfiguration.create();
int status;
try {
TestAcidGuarantees test = new TestAcidGuarantees();
status = ToolRunner.run(c, test, args);
} catch (Exception e) {
LOG.error("Exiting due to error", e);
status = -1;
}
System.exit(status);
} }
} }