HBASE-2670. Provide atomicity for readers even when new insert has same timestamp as current row.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@955076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2010-06-15 22:37:56 +00:00
parent 71f2632f64
commit fef909299a
7 changed files with 319 additions and 35 deletions

View File

@ -386,6 +386,8 @@ Release 0.21.0 - Unreleased
HBASE-2712 Cached region location that went stale won't recover if HBASE-2712 Cached region location that went stale won't recover if
asking for first row asking for first row
HBASE-2732 TestZooKeeper was broken, HBASE-2691 showed it HBASE-2732 TestZooKeeper was broken, HBASE-2691 showed it
HBASE-2670 Provide atomicity for readers even when new insert has
same timestamp as current row.
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Comparator; import java.util.Comparator;
import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
@ -553,7 +554,12 @@ public class KeyValue implements Writable, HeapSize {
public KeyValue clone() { public KeyValue clone() {
byte [] b = new byte[this.length]; byte [] b = new byte[this.length];
System.arraycopy(this.bytes, this.offset, b, 0, this.length); System.arraycopy(this.bytes, this.offset, b, 0, this.length);
return new KeyValue(b, 0, b.length); KeyValue ret = new KeyValue(b, 0, b.length);
// Important to clone the memstoreTS as well - otherwise memstore's
// update-in-place methods (eg increment) will end up creating
// new entries
ret.setMemstoreTS(memstoreTS);
return ret;
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@ -1294,10 +1300,13 @@ public class KeyValue implements Writable, HeapSize {
} }
public int compare(final KeyValue left, final KeyValue right) { public int compare(final KeyValue left, final KeyValue right) {
return getRawComparator().compare(left.getBuffer(), int ret = getRawComparator().compare(left.getBuffer(),
left.getOffset() + ROW_OFFSET, left.getKeyLength(), left.getOffset() + ROW_OFFSET, left.getKeyLength(),
right.getBuffer(), right.getOffset() + ROW_OFFSET, right.getBuffer(), right.getOffset() + ROW_OFFSET,
right.getKeyLength()); right.getKeyLength());
if (ret != 0) return ret;
// Negate this comparison so later edits show up first
return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS());
} }
public int compareTimestamps(final KeyValue left, final KeyValue right) { public int compareTimestamps(final KeyValue left, final KeyValue right) {

View File

@ -57,16 +57,11 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;

View File

@ -22,10 +22,15 @@ package org.apache.hadoop.hbase;
import java.util.Set; import java.util.Set;
import java.util.HashSet; import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
public abstract class MultithreadedTestUtil { public abstract class MultithreadedTestUtil {
public static final Log LOG =
LogFactory.getLog(MultithreadedTestUtil.class);
public static class TestContext { public static class TestContext {
private final Configuration conf; private final Configuration conf;
private Throwable err = null; private Throwable err = null;
@ -74,6 +79,7 @@ public abstract class MultithreadedTestUtil {
public synchronized void threadFailed(Throwable t) { public synchronized void threadFailed(Throwable t) {
if (err == null) err = t; if (err == null) err = t;
LOG.error("Failed!", err);
notify(); notify();
} }

View File

@ -33,11 +33,21 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/**
* Test case that uses multiple threads to read and write multifamily rows
* into a table, verifying that reads never see partially-complete writes.
*
* This can run as a junit test, or with a main() function which runs against
* a real cluster (eg for testing with failures, region movement, etc)
*/
public class TestAcidGuarantees { public class TestAcidGuarantees {
protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees"); public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
@ -62,25 +72,33 @@ public class TestAcidGuarantees {
} }
public TestAcidGuarantees() { public TestAcidGuarantees() {
util = new HBaseTestingUtility(); // Set small flush size for minicluster so we exercise reseeking scanners
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
util = new HBaseTestingUtility(conf);
} }
/**
* Thread that does random full-row writes into a table.
*/
public static class AtomicityWriter extends RepeatingTestThread { public static class AtomicityWriter extends RepeatingTestThread {
Random rand = new Random(); Random rand = new Random();
byte data[] = new byte[10]; byte data[] = new byte[10];
byte targetRow[]; byte targetRows[][];
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; HTable table;
AtomicLong numWritten = new AtomicLong(); AtomicLong numWritten = new AtomicLong();
public AtomicityWriter(TestContext ctx, byte targetRow[], public AtomicityWriter(TestContext ctx, byte targetRows[][],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][]) throws IOException {
super(ctx); super(ctx);
this.targetRow = targetRow; this.targetRows = targetRows;
this.targetFamilies = targetFamilies; this.targetFamilies = targetFamilies;
table = new HTable(ctx.getConf(), TABLE_NAME); table = new HTable(ctx.getConf(), TABLE_NAME);
} }
public void doAnAction() throws Exception { public void doAnAction() throws Exception {
// Pick a random row to write into
byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
Put p = new Put(targetRow); Put p = new Put(targetRow);
rand.nextBytes(data); rand.nextBytes(data);
@ -95,14 +113,18 @@ public class TestAcidGuarantees {
} }
} }
public static class AtomicityReader extends RepeatingTestThread { /**
* Thread that does single-row reads in a table, looking for partially
* completed rows.
*/
public static class AtomicGetReader extends RepeatingTestThread {
byte targetRow[]; byte targetRow[];
byte targetFamilies[][]; byte targetFamilies[][];
HTable table; HTable table;
int numVerified = 0; int numVerified = 0;
AtomicLong numRead = new AtomicLong(); AtomicLong numRead = new AtomicLong();
public AtomicityReader(TestContext ctx, byte targetRow[], public AtomicGetReader(TestContext ctx, byte targetRow[],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][]) throws IOException {
super(ctx); super(ctx);
this.targetRow = targetRow; this.targetRow = targetRow;
@ -114,6 +136,12 @@ public class TestAcidGuarantees {
Get g = new Get(targetRow); Get g = new Get(targetRow);
Result res = table.get(g); Result res = table.get(g);
byte[] gotValue = null; byte[] gotValue = null;
if (res.getRow() == null) {
// Trying to verify but we didn't find the row - the writing
// thread probably just hasn't started writing yet, so we can
// ignore this action
return;
}
for (byte[] family : targetFamilies) { for (byte[] family : targetFamilies) {
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
@ -144,24 +172,98 @@ public class TestAcidGuarantees {
} }
} }
/**
* Thread that does full scans of the table looking for any partially completed
* rows.
*/
public static class AtomicScanReader extends RepeatingTestThread {
byte targetFamilies[][];
HTable table;
AtomicLong numScans = new AtomicLong();
AtomicLong numRowsScanned = new AtomicLong();
public void runTestAtomicity(long millisToRun) throws Exception { public AtomicScanReader(TestContext ctx,
byte targetFamilies[][]) throws IOException {
super(ctx);
this.targetFamilies = targetFamilies;
table = new HTable(ctx.getConf(), TABLE_NAME);
}
public void doAnAction() throws Exception {
Scan s = new Scan();
for (byte[] family : targetFamilies) {
s.addFamily(family);
}
ResultScanner scanner = table.getScanner(s);
for (Result res : scanner) {
byte[] gotValue = null;
for (byte[] family : targetFamilies) {
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
byte qualifier[] = Bytes.toBytes("col" + i);
byte thisValue[] = res.getValue(family, qualifier);
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
gotFailure(gotValue, res);
}
gotValue = thisValue;
}
}
numRowsScanned.getAndIncrement();
}
numScans.getAndIncrement();
}
private void gotFailure(byte[] expected, Result res) {
StringBuilder msg = new StringBuilder();
msg.append("Failed after ").append(numRowsScanned).append("!");
msg.append("Expected=").append(Bytes.toStringBinary(expected));
msg.append("Got:\n");
for (KeyValue kv : res.list()) {
msg.append(kv.toString());
msg.append(" val= ");
msg.append(Bytes.toStringBinary(kv.getValue()));
msg.append("\n");
}
throw new RuntimeException(msg.toString());
}
}
public void runTestAtomicity(long millisToRun,
int numWriters,
int numGetters,
int numScanners,
int numUniqueRows) throws Exception {
createTableIfMissing(); createTableIfMissing();
TestContext ctx = new TestContext(util.getConfiguration()); TestContext ctx = new TestContext(util.getConfiguration());
byte row[] = Bytes.toBytes("test_row");
byte rows[][] = new byte[numUniqueRows][];
for (int i = 0; i < numUniqueRows; i++) {
rows[i] = Bytes.toBytes("test_row_" + i);
}
List<AtomicityWriter> writers = Lists.newArrayList(); List<AtomicityWriter> writers = Lists.newArrayList();
for (int i = 0; i < 5; i++) { for (int i = 0; i < numWriters; i++) {
AtomicityWriter writer = new AtomicityWriter(ctx, row, FAMILIES); AtomicityWriter writer = new AtomicityWriter(
ctx, rows, FAMILIES);
writers.add(writer); writers.add(writer);
ctx.addThread(writer); ctx.addThread(writer);
} }
List<AtomicityReader> readers = Lists.newArrayList(); List<AtomicGetReader> getters = Lists.newArrayList();
for (int i = 0; i < 5; i++) { for (int i = 0; i < numGetters; i++) {
AtomicityReader reader = new AtomicityReader(ctx, row, FAMILIES); AtomicGetReader getter = new AtomicGetReader(
readers.add(reader); ctx, rows[i % numUniqueRows], FAMILIES);
ctx.addThread(reader); getters.add(getter);
ctx.addThread(getter);
}
List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
scanners.add(scanner);
ctx.addThread(scanner);
} }
ctx.startThreads(); ctx.startThreads();
@ -173,16 +275,43 @@ public class TestAcidGuarantees {
LOG.info(" wrote " + writer.numWritten.get()); LOG.info(" wrote " + writer.numWritten.get());
} }
LOG.info("Readers:"); LOG.info("Readers:");
for (AtomicityReader reader : readers) { for (AtomicGetReader reader : getters) {
LOG.info(" read " + reader.numRead.get()); LOG.info(" read " + reader.numRead.get());
} }
LOG.info("Scanners:");
for (AtomicScanReader scanner : scanners) {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
} }
@Test @Test
public void testAtomicity() throws Exception { public void testGetAtomicity() throws Exception {
util.startMiniCluster(3); util.startMiniCluster(1);
try { try {
runTestAtomicity(20000); runTestAtomicity(20000, 5, 5, 0, 3);
} finally {
util.shutdownMiniCluster();
}
}
@Test
@Ignore("Currently not passing - see HBASE-2670")
public void testScanAtomicity() throws Exception {
util.startMiniCluster(1);
try {
runTestAtomicity(20000, 5, 0, 5, 3);
} finally {
util.shutdownMiniCluster();
}
}
@Test
@Ignore("Currently not passing - see HBASE-2670")
public void testMixedAtomicity() throws Exception {
util.startMiniCluster(1);
try {
runTestAtomicity(20000, 5, 2, 2, 3);
} finally { } finally {
util.shutdownMiniCluster(); util.shutdownMiniCluster();
} }
@ -192,7 +321,7 @@ public class TestAcidGuarantees {
Configuration c = HBaseConfiguration.create(); Configuration c = HBaseConfiguration.create();
TestAcidGuarantees test = new TestAcidGuarantees(); TestAcidGuarantees test = new TestAcidGuarantees();
test.setConf(c); test.setConf(c);
test.runTestAtomicity(5*60*1000); test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
} }
private void setConf(Configuration c) { private void setConf(Configuration c) {

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.rmi.UnexpectedException; import java.rmi.UnexpectedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -38,6 +39,10 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/** memstore test case */ /** memstore test case */
public class TestMemStore extends TestCase { public class TestMemStore extends TestCase {
private final Log LOG = LogFactory.getLog(this.getClass()); private final Log LOG = LogFactory.getLog(this.getClass());
@ -204,11 +209,18 @@ public class TestMemStore extends TestCase {
private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
throws IOException { throws IOException {
scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
for (KeyValue kv : expected) { List<KeyValue> returned = Lists.newArrayList();
assertTrue(0 ==
KeyValue.COMPARATOR.compare(kv, while (true) {
scanner.next())); KeyValue next = scanner.next();
if (next == null) break;
returned.add(next);
} }
assertTrue(
"Got:\n" + Joiner.on("\n").join(returned) +
"\nExpected:\n" + Joiner.on("\n").join(expected),
Iterables.elementsEqual(Arrays.asList(expected), returned));
assertNull(scanner.peek()); assertNull(scanner.peek());
} }
@ -252,6 +264,115 @@ public class TestMemStore extends TestCase {
assertScannerResults(s, new KeyValue[]{kv1, kv2}); assertScannerResults(s, new KeyValue[]{kv1, kv2});
} }
/**
* Regression test for HBASE-2616, HBASE-2670.
* When we insert a higher-memstoreTS version of a cell but with
* the same timestamp, we still need to provide consistent reads
* for the same scanner.
*/
public void testMemstoreEditsVisibilityWithSameKey() throws IOException {
final byte[] row = Bytes.toBytes(1);
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v1 = Bytes.toBytes("value1");
final byte[] v2 = Bytes.toBytes("value2");
// INSERT 1: Write both columns val1
ReadWriteConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMemstoreTS(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMemstoreTS(w.getWriteNumber());
memstore.add(kv12);
rwcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2
w = rwcc.beginMemstoreInsert();
KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setMemstoreTS(w.getWriteNumber());
memstore.add(kv21);
KeyValue kv22 = new KeyValue(row, f, q2, v2);
kv22.setMemstoreTS(w.getWriteNumber());
memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE INSERT 2
rwcc.completeMemstoreInsert(w);
// NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
// See HBASE-1485 for discussion about what we should do with
// the duplicate-TS inserts
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
}
/**
* When we insert a higher-memstoreTS deletion of a cell but with
* the same timestamp, we still need to provide consistent reads
* for the same scanner.
*/
public void testMemstoreDeletesVisibilityWithSameKey() throws IOException {
final byte[] row = Bytes.toBytes(1);
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
ReadWriteConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMemstoreTS(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMemstoreTS(w.getWriteNumber());
memstore.add(kv12);
rwcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns
w = rwcc.beginMemstoreInsert();
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
kvDel.setMemstoreTS(w.getWriteNumber());
memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE DELETE
rwcc.completeMemstoreInsert(w);
// NOW WE SHOULD SEE DELETE
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
}
private static class ReadOwnWritesTester extends Thread { private static class ReadOwnWritesTester extends Thread {
static final int NUM_TRIES = 1000; static final int NUM_TRIES = 1000;

View File

@ -25,6 +25,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -136,6 +140,7 @@ public class TestStoreScanner extends TestCase {
* Test test shows exactly how the matcher's return codes confuses the StoreScanner * Test test shows exactly how the matcher's return codes confuses the StoreScanner
* and prevent it from doing the right thing. Seeking once, then nexting twice * and prevent it from doing the right thing. Seeking once, then nexting twice
* should return R1, then R2, but in this case it doesnt. * should return R1, then R2, but in this case it doesnt.
* TODO this comment makes no sense above. Appears to do the right thing.
* @throws IOException * @throws IOException
*/ */
public void testWontNextToNext() throws IOException { public void testWontNextToNext() throws IOException {
@ -430,4 +435,21 @@ public class TestStoreScanner extends TestCase {
assertEquals(false, scanner.next(results)); assertEquals(false, scanner.next(results));
} }
/**
* TODO this fails, since we don't handle deletions, etc, in peek
*/
public void SKIP_testPeek() throws Exception {
KeyValue [] kvs = new KeyValue [] {
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Delete, "dont-care"),
};
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
assertNull(scan.peek());
}
} }