HBASE-7377 Clean up TestHBase7051

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1423674 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
gchanan 2012-12-18 21:33:52 +00:00
parent 96f6aa3b95
commit 344ae36d07
1 changed files with 82 additions and 69 deletions

View File

@ -16,6 +16,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -30,19 +33,32 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/**
* Test of HBASE-7051; that checkAndPuts and puts behave atomically with respect to each other.
* Rather than perform a bunch of trials to verify atomicity, this test recreates a race condition
* that causes the test to fail if checkAndPut doesn't wait for outstanding put transactions
* to complete. It does this by invasively overriding HRegion function to affect the timing of
* the operations.
*/
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestHBase7051 { public class TestHBase7051 {
private static volatile boolean putCompleted = false;
private static CountDownLatch latch = new CountDownLatch(1); private static CountDownLatch latch = new CountDownLatch(1);
private boolean checkAndPutCompleted = false; private enum TestStep {
private static int count = 0; INIT, // initial put of 10 to set value of the cell
PUT_STARTED, // began doing a put of 50 to cell
PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
CHECKANDPUT_COMPLETED // completed checkAndPut
// NOTE: at the end of these steps, the value of the cell should be 50, not 11!
}
private static volatile TestStep testStep = TestStep.INIT;
private final String family = "f1";
@Test @Test
public void testPutAndCheckAndPutInParallel() throws Exception { public void testPutAndCheckAndPutInParallel() throws Exception {
final String tableName = "testPutAndCheckAndPut"; final String tableName = "testPutAndCheckAndPut";
final String family = "f1";
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName),
@ -57,14 +73,16 @@ public class TestHBase7051 {
putsAndLocks.add(pair); putsAndLocks.add(pair);
count++;
region.batchMutate(putsAndLocks.toArray(new Pair[0])); region.batchMutate(putsAndLocks.toArray(new Pair[0]));
makeCheckAndPut(family, region); MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
makePut(family, region); ctx.addThread(new PutThread(ctx, region));
while (!checkAndPutCompleted) { ctx.addThread(new CheckAndPutThread(ctx, region));
ctx.startThreads();
while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
Thread.sleep(100); Thread.sleep(100);
} }
ctx.stop();
Scan s = new Scan(); Scan s = new Scan();
RegionScanner scanner = region.getScanner(s); RegionScanner scanner = region.getScanner(s);
List<KeyValue> results = new ArrayList<KeyValue>(); List<KeyValue> results = new ArrayList<KeyValue>();
@ -75,55 +93,47 @@ public class TestHBase7051 {
} }
private void makePut(final String family, final MockHRegion region) { private class PutThread extends TestThread {
new Thread() { private MockHRegion region;
public void run() { PutThread(TestContext ctx, MockHRegion region) {
super(ctx);
this.region = region;
}
public void doWork() throws Exception {
List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList(); List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
Put[] puts = new Put[1]; Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1")); Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
puts[0] = put; puts[0] = put;
try {
Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null); Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
putsAndLocks.add(pair); putsAndLocks.add(pair);
count++; testStep = TestStep.PUT_STARTED;
region.batchMutate(putsAndLocks.toArray(new Pair[0])); region.batchMutate(putsAndLocks.toArray(new Pair[0]));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
} }
}.start();
}
private void makeCheckAndPut(final String family, final MockHRegion region) { private class CheckAndPutThread extends TestThread {
new Thread() { private MockHRegion region;
CheckAndPutThread(TestContext ctx, MockHRegion region) {
super(ctx);
this.region = region;
}
public void run() { public void doWork() throws Exception {
Put[] puts = new Put[1]; Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1")); Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
puts[0] = put; puts[0] = put;
try { while (testStep != TestStep.PUT_COMPLETED) {
while (putCompleted == false) {
try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
} testStep = TestStep.CHECKANDPUT_STARTED;
count++;
region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, null, true); CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, null, true);
checkAndPutCompleted = true; testStep = TestStep.CHECKANDPUT_COMPLETED;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
} }
}.start();
}
public static class MockHRegion extends HRegion { public static class MockHRegion extends HRegion {
@ -134,36 +144,39 @@ public class TestHBase7051 {
@Override @Override
public void releaseRowLock(Integer lockId) { public void releaseRowLock(Integer lockId) {
if (count == 1) { if (testStep == TestStep.INIT) {
super.releaseRowLock(lockId); super.releaseRowLock(lockId);
return; return;
} }
if (count == 2) { if (testStep == TestStep.PUT_STARTED) {
try { try {
putCompleted = true; testStep = TestStep.PUT_COMPLETED;
super.releaseRowLock(lockId); super.releaseRowLock(lockId);
// put has been written to the memstore and the row lock has been released, but the
// MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
// operations would cause the non-atomicity to show up:
// 1) Put releases row lock (where we are now)
// 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
// because the MVCC has not advanced
// 3) Put advances MVCC
// So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
// (see below), and then wait some more to give the checkAndPut time to read the old
// value.
latch.await(); latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (count == 3) {
super.releaseRowLock(lockId);
try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO Auto-generated catch block Thread.currentThread().interrupt();
e.printStackTrace();
} }
latch.countDown(); }
else if (testStep == TestStep.CHECKANDPUT_STARTED) {
super.releaseRowLock(lockId);
} }
} }
@Override @Override
public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
if (count == 3) { if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown(); latch.countDown();
} }
return super.getLock(lockid, row, waitForLock); return super.getLock(lockid, row, waitForLock);