From fef909299afcc72e677a11288d68454eada2ce04 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 15 Jun 2010 22:37:56 +0000 Subject: [PATCH] HBASE-2670. Provide atomicity for readers even when new insert has same timestamp as current row. git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@955076 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../org/apache/hadoop/hbase/KeyValue.java | 15 +- .../hadoop/hbase/regionserver/Store.java | 5 - .../hadoop/hbase/MultithreadedTestUtil.java | 6 + .../hadoop/hbase/TestAcidGuarantees.java | 175 +++++++++++++++--- .../hbase/regionserver/TestMemStore.java | 129 ++++++++++++- .../hbase/regionserver/TestStoreScanner.java | 22 +++ 7 files changed, 319 insertions(+), 35 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 43bcf488547..78c7ba06518 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -386,6 +386,8 @@ Release 0.21.0 - Unreleased HBASE-2712 Cached region location that went stale won't recover if asking for first row HBASE-2732 TestZooKeeper was broken, HBASE-2691 showed it + HBASE-2670 Provide atomicity for readers even when new insert has + same timestamp as current row. IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 71284cfa1da..702e4e710a6 100644 --- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; +import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.HeapSize; @@ -553,7 +554,12 @@ public class KeyValue implements Writable, HeapSize { public KeyValue clone() { byte [] b = new byte[this.length]; System.arraycopy(this.bytes, this.offset, b, 0, this.length); - return new KeyValue(b, 0, b.length); + KeyValue ret = new KeyValue(b, 0, b.length); + // Important to clone the memstoreTS as well - otherwise memstore's + // update-in-place methods (eg increment) will end up creating + // new entries + ret.setMemstoreTS(memstoreTS); + return ret; } //--------------------------------------------------------------------------- @@ -1294,10 +1300,13 @@ public class KeyValue implements Writable, HeapSize { } public int compare(final KeyValue left, final KeyValue right) { - return getRawComparator().compare(left.getBuffer(), + int ret = getRawComparator().compare(left.getBuffer(), left.getOffset() + ROW_OFFSET, left.getKeyLength(), - right.getBuffer(), right.getOffset() + ROW_OFFSET, + right.getBuffer(), right.getOffset() + ROW_OFFSET, right.getKeyLength()); + if (ret != 0) return ret; + // Negate this comparison so later edits show up first + return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS()); } public int compareTimestamps(final KeyValue left, final KeyValue right) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 2a0dceed462..1a604da1464 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -57,16 +57,11 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; import java.util.SortedSet; -import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; diff --git a/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java b/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java index 7c062d7ffbb..6675fac7143 100644 --- a/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java +++ b/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java @@ -22,10 +22,15 @@ package org.apache.hadoop.hbase; import java.util.Set; import java.util.HashSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; public abstract class MultithreadedTestUtil { + public static final Log LOG = + LogFactory.getLog(MultithreadedTestUtil.class); + public static class TestContext { private final Configuration conf; private Throwable err = null; @@ -74,6 +79,7 @@ public abstract class MultithreadedTestUtil { public synchronized void threadFailed(Throwable t) { if (err == null) err = t; + LOG.error("Failed!", err); notify(); } diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 75f3c8b4dc4..33767ae9f23 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -33,11 +33,21 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Lists; +/** + * Test case that uses multiple threads to read and write multifamily rows + * into a table, verifying that reads never see partially-complete writes. + * + * This can run as a junit test, or with a main() function which runs against + * a real cluster (eg for testing with failures, region movement, etc) + */ public class TestAcidGuarantees { protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees"); @@ -62,25 +72,33 @@ public class TestAcidGuarantees { } public TestAcidGuarantees() { - util = new HBaseTestingUtility(); + // Set small flush size for minicluster so we exercise reseeking scanners + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024)); + util = new HBaseTestingUtility(conf); } + /** + * Thread that does random full-row writes into a table. + */ public static class AtomicityWriter extends RepeatingTestThread { Random rand = new Random(); byte data[] = new byte[10]; - byte targetRow[]; + byte targetRows[][]; byte targetFamilies[][]; HTable table; AtomicLong numWritten = new AtomicLong(); - public AtomicityWriter(TestContext ctx, byte targetRow[], + public AtomicityWriter(TestContext ctx, byte targetRows[][], byte targetFamilies[][]) throws IOException { super(ctx); - this.targetRow = targetRow; + this.targetRows = targetRows; this.targetFamilies = targetFamilies; table = new HTable(ctx.getConf(), TABLE_NAME); } public void doAnAction() throws Exception { + // Pick a random row to write into + byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; Put p = new Put(targetRow); rand.nextBytes(data); @@ -95,14 +113,18 @@ public class TestAcidGuarantees { } } - public static class AtomicityReader extends RepeatingTestThread { + /** + * Thread that does single-row reads in a table, looking for partially + * completed rows. + */ + public static class AtomicGetReader extends RepeatingTestThread { byte targetRow[]; byte targetFamilies[][]; HTable table; int numVerified = 0; AtomicLong numRead = new AtomicLong(); - public AtomicityReader(TestContext ctx, byte targetRow[], + public AtomicGetReader(TestContext ctx, byte targetRow[], byte targetFamilies[][]) throws IOException { super(ctx); this.targetRow = targetRow; @@ -114,7 +136,13 @@ public class TestAcidGuarantees { Get g = new Get(targetRow); Result res = table.get(g); byte[] gotValue = null; - + if (res.getRow() == null) { + // Trying to verify but we didn't find the row - the writing + // thread probably just hasn't started writing yet, so we can + // ignore this action + return; + } + for (byte[] family : targetFamilies) { for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { byte qualifier[] = Bytes.toBytes("col" + i); @@ -143,25 +171,99 @@ public class TestAcidGuarantees { throw new RuntimeException(msg.toString()); } } + + /** + * Thread that does full scans of the table looking for any partially completed + * rows. + */ + public static class AtomicScanReader extends RepeatingTestThread { + byte targetFamilies[][]; + HTable table; + AtomicLong numScans = new AtomicLong(); + AtomicLong numRowsScanned = new AtomicLong(); + + public AtomicScanReader(TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.targetFamilies = targetFamilies; + table = new HTable(ctx.getConf(), TABLE_NAME); + } + + public void doAnAction() throws Exception { + Scan s = new Scan(); + for (byte[] family : targetFamilies) { + s.addFamily(family); + } + ResultScanner scanner = table.getScanner(s); + + for (Result res : scanner) { + byte[] gotValue = null; + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { + gotFailure(gotValue, res); + } + gotValue = thisValue; + } + } + numRowsScanned.getAndIncrement(); + } + numScans.getAndIncrement(); + } + + private void gotFailure(byte[] expected, Result res) { + StringBuilder msg = new StringBuilder(); + msg.append("Failed after ").append(numRowsScanned).append("!"); + msg.append("Expected=").append(Bytes.toStringBinary(expected)); + msg.append("Got:\n"); + for (KeyValue kv : res.list()) { + msg.append(kv.toString()); + msg.append(" val= "); + msg.append(Bytes.toStringBinary(kv.getValue())); + msg.append("\n"); + } + throw new RuntimeException(msg.toString()); + } + } - public void runTestAtomicity(long millisToRun) throws Exception { + public void runTestAtomicity(long millisToRun, + int numWriters, + int numGetters, + int numScanners, + int numUniqueRows) throws Exception { createTableIfMissing(); TestContext ctx = new TestContext(util.getConfiguration()); - byte row[] = Bytes.toBytes("test_row"); - + + byte rows[][] = new byte[numUniqueRows][]; + for (int i = 0; i < numUniqueRows; i++) { + rows[i] = Bytes.toBytes("test_row_" + i); + } + List writers = Lists.newArrayList(); - for (int i = 0; i < 5; i++) { - AtomicityWriter writer = new AtomicityWriter(ctx, row, FAMILIES); + for (int i = 0; i < numWriters; i++) { + AtomicityWriter writer = new AtomicityWriter( + ctx, rows, FAMILIES); writers.add(writer); ctx.addThread(writer); } - List readers = Lists.newArrayList(); - for (int i = 0; i < 5; i++) { - AtomicityReader reader = new AtomicityReader(ctx, row, FAMILIES); - readers.add(reader); - ctx.addThread(reader); + List getters = Lists.newArrayList(); + for (int i = 0; i < numGetters; i++) { + AtomicGetReader getter = new AtomicGetReader( + ctx, rows[i % numUniqueRows], FAMILIES); + getters.add(getter); + ctx.addThread(getter); + } + + List scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); + scanners.add(scanner); + ctx.addThread(scanner); } ctx.startThreads(); @@ -173,26 +275,53 @@ public class TestAcidGuarantees { LOG.info(" wrote " + writer.numWritten.get()); } LOG.info("Readers:"); - for (AtomicityReader reader : readers) { + for (AtomicGetReader reader : getters) { LOG.info(" read " + reader.numRead.get()); } + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } } @Test - public void testAtomicity() throws Exception { - util.startMiniCluster(3); + public void testGetAtomicity() throws Exception { + util.startMiniCluster(1); try { - runTestAtomicity(20000); + runTestAtomicity(20000, 5, 5, 0, 3); } finally { util.shutdownMiniCluster(); } } - + + @Test + @Ignore("Currently not passing - see HBASE-2670") + public void testScanAtomicity() throws Exception { + util.startMiniCluster(1); + try { + runTestAtomicity(20000, 5, 0, 5, 3); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test + @Ignore("Currently not passing - see HBASE-2670") + public void testMixedAtomicity() throws Exception { + util.startMiniCluster(1); + try { + runTestAtomicity(20000, 5, 2, 2, 3); + } finally { + util.shutdownMiniCluster(); + } + } + public static void main(String args[]) throws Exception { Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000); + test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); } private void setConf(Configuration c) { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 9833d768ffe..4f27ea51777 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.rmi.UnexpectedException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; @@ -38,6 +39,10 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + /** memstore test case */ public class TestMemStore extends TestCase { private final Log LOG = LogFactory.getLog(this.getClass()); @@ -204,11 +209,18 @@ public class TestMemStore extends TestCase { private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) throws IOException { scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); - for (KeyValue kv : expected) { - assertTrue(0 == - KeyValue.COMPARATOR.compare(kv, - scanner.next())); + List returned = Lists.newArrayList(); + + while (true) { + KeyValue next = scanner.next(); + if (next == null) break; + returned.add(next); } + + assertTrue( + "Got:\n" + Joiner.on("\n").join(returned) + + "\nExpected:\n" + Joiner.on("\n").join(expected), + Iterables.elementsEqual(Arrays.asList(expected), returned)); assertNull(scanner.peek()); } @@ -252,6 +264,115 @@ public class TestMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv1, kv2}); } + /** + * Regression test for HBASE-2616, HBASE-2670. + * When we insert a higher-memstoreTS version of a cell but with + * the same timestamp, we still need to provide consistent reads + * for the same scanner. + */ + public void testMemstoreEditsVisibilityWithSameKey() throws IOException { + final byte[] row = Bytes.toBytes(1); + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] v1 = Bytes.toBytes("value1"); + final byte[] v2 = Bytes.toBytes("value2"); + + // INSERT 1: Write both columns val1 + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + KeyValue kv11 = new KeyValue(row, f, q1, v1); + kv11.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv11); + + KeyValue kv12 = new KeyValue(row, f, q2, v1); + kv12.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv12); + rwcc.completeMemstoreInsert(w); + + // BEFORE STARTING INSERT 2, SEE FIRST KVS + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv11, kv12}); + + // START INSERT 2: Write both columns val2 + w = rwcc.beginMemstoreInsert(); + KeyValue kv21 = new KeyValue(row, f, q1, v2); + kv21.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv21); + + KeyValue kv22 = new KeyValue(row, f, q2, v2); + kv22.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv22); + + // BEFORE COMPLETING INSERT 2, SEE FIRST KVS + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv11, kv12}); + + // COMPLETE INSERT 2 + rwcc.completeMemstoreInsert(w); + + // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. + // See HBASE-1485 for discussion about what we should do with + // the duplicate-TS inserts + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); + } + + /** + * When we insert a higher-memstoreTS deletion of a cell but with + * the same timestamp, we still need to provide consistent reads + * for the same scanner. + */ + public void testMemstoreDeletesVisibilityWithSameKey() throws IOException { + final byte[] row = Bytes.toBytes(1); + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] v1 = Bytes.toBytes("value1"); + // INSERT 1: Write both columns val1 + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + KeyValue kv11 = new KeyValue(row, f, q1, v1); + kv11.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv11); + + KeyValue kv12 = new KeyValue(row, f, q2, v1); + kv12.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv12); + rwcc.completeMemstoreInsert(w); + + // BEFORE STARTING INSERT 2, SEE FIRST KVS + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv11, kv12}); + + // START DELETE: Insert delete for one of the columns + w = rwcc.beginMemstoreInsert(); + KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), + KeyValue.Type.DeleteColumn); + kvDel.setMemstoreTS(w.getWriteNumber()); + memstore.add(kvDel); + + // BEFORE COMPLETING DELETE, SEE FIRST KVS + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv11, kv12}); + + // COMPLETE DELETE + rwcc.completeMemstoreInsert(w); + + // NOW WE SHOULD SEE DELETE + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners().get(0); + assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); + } + + private static class ReadOwnWritesTester extends Thread { static final int NUM_TRIES = 1000; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 0566af7de08..2bbc0f0fd0b 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -25,6 +25,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; + +import com.google.common.collect.Lists; import java.io.IOException; import java.util.ArrayList; @@ -136,6 +140,7 @@ public class TestStoreScanner extends TestCase { * Test test shows exactly how the matcher's return codes confuses the StoreScanner * and prevent it from doing the right thing. Seeking once, then nexting twice * should return R1, then R2, but in this case it doesnt. + * TODO this comment makes no sense above. Appears to do the right thing. * @throws IOException */ public void testWontNextToNext() throws IOException { @@ -430,4 +435,21 @@ public class TestStoreScanner extends TestCase { assertEquals(false, scanner.next(results)); } + + + /** + * TODO this fails, since we don't handle deletions, etc, in peek + */ + public void SKIP_testPeek() throws Exception { + KeyValue [] kvs = new KeyValue [] { + KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"), + KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Delete, "dont-care"), + }; + List scanners = scanFixture(kvs); + Scan scanSpec = new Scan(Bytes.toBytes("R1")); + StoreScanner scan = + new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, + getCols("a"), scanners); + assertNull(scan.peek()); + } }